Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions be/src/format/new_parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,18 +327,47 @@ Status ParquetReader::_execute_filter_conjuncts(int64_t batch_rows, Block* file_
// predicate columns in the file-local block have been materialized.
for (const auto& expression_filter : _request->expression_filters) {
if (expression_filter.conjunct == nullptr) {
continue;
if (expression_filter.delete_conjunct == nullptr) {
continue;
}
} else {
if (*selected_rows == 0) {
break;
}
IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
bool can_filter_all = false;
RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(
file_block, filter.data(), static_cast<size_t>(batch_rows), false,
&can_filter_all));
*selected_rows = can_filter_all ? 0
: _apply_filter_to_selection(filter, selection,
*selected_rows);
}
if (*selected_rows == 0) {
break;
}
IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
bool can_filter_all = false;
RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(file_block, filter.data(),
static_cast<size_t>(batch_rows),
false, &can_filter_all));
if (expression_filter.delete_conjunct == nullptr) {
continue;
}
int result_column_id = -1;
RETURN_IF_ERROR(expression_filter.delete_conjunct->root()->execute(
expression_filter.delete_conjunct.get(), file_block, &result_column_id));
DORIS_CHECK(result_column_id >= 0 &&
result_column_id < static_cast<int>(file_block->columns()));
const auto& delete_filter = assert_cast<const ColumnUInt8&>(
*file_block->get_by_position(result_column_id).column)
.get_data();
DORIS_CHECK(delete_filter.size() == static_cast<size_t>(batch_rows));
IColumn::Filter keep_filter(static_cast<size_t>(batch_rows), 1);
bool has_kept_row = false;
for (size_t row = 0; row < static_cast<size_t>(batch_rows); ++row) {
keep_filter[row] = !delete_filter[row];
has_kept_row |= keep_filter[row] != 0;
}
file_block->erase(result_column_id);
*selected_rows =
can_filter_all ? 0 : _apply_filter_to_selection(filter, selection, *selected_rows);
!has_kept_row ? 0
: _apply_filter_to_selection(keep_filter, selection, *selected_rows);
}
return Status::OK();
}
Expand Down
14 changes: 7 additions & 7 deletions be/src/format/reader/expr/delete_predicate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,26 @@ void DeletePredicate::close(VExprContext* context, FunctionContext::FunctionStat
* Row IDs should be generated by file reader as a virtual column in `block`.
**/
Status DeletePredicate::execute(VExprContext* context, Block* block, int* result_column_id) const {
if (block->empty()) {
return Status::OK();
}
DCHECK(_open_finished || block == nullptr);
if (_children.size() != 1) {
return Status::InternalError(fmt::format(
"DeletePredicate should have exactly 1 child expr, but got {}", _children.size()));
}
int slot = -1;
RETURN_IF_ERROR(_children[0]->execute(context, block, &slot));
const auto count = block->rows();
auto res_col = ColumnBool::create(block->rows(), 0);
const auto& row_ids =
assert_cast<const ColumnInt64&>(*block->get_by_position(slot).column).get_data();
DCHECK_EQ(row_ids.size(), count);
const auto count = row_ids.size();
auto res_col = ColumnBool::create(count, 0);
if (_deleted_rows.empty()) {
block->insert({std::move(res_col), std::make_shared<DataTypeBool>(), expr_name()});
*result_column_id = static_cast<int>(block->get_columns().size() - 1);
return Status::OK();
}
if (count == 0) {
block->insert({std::move(res_col), std::make_shared<DataTypeBool>(), expr_name()});
*result_column_id = static_cast<int>(block->get_columns().size() - 1);
return Status::OK();
}
const int64_t* delete_rows = _deleted_rows.data();
const int64_t* delete_rows_end = delete_rows + _deleted_rows.size();
const int64_t* start_pos = std::lower_bound(delete_rows, delete_rows_end, row_ids[0]);
Expand Down
35 changes: 24 additions & 11 deletions be/src/format/reader/table/paimon_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,39 @@

#include "format/reader/table/paimon_reader.h"

#include <cstring>
#include <string>

#include "format/table/deletion_vector_reader.h"

namespace doris::paimon {

bool PaimonReader::_parse_delete_file(const TTableFormatFileDesc& t_desc, DeleteFileDesc& desc) {
Status PaimonReader::_parse_deletion_vector_file(const TTableFormatFileDesc& t_desc,
DeleteFileDesc* desc, bool* has_delete_file) {
DORIS_CHECK(desc != nullptr);
DORIS_CHECK(has_delete_file != nullptr);
*has_delete_file = false;
const auto& table_desc = t_desc.paimon_params;
if (!table_desc.__isset.deletion_file) {
return false;
return Status::OK();
}
const auto& deletion_file = table_desc.deletion_file;

desc.key.resize(deletion_file.path.size() + sizeof(deletion_file.offset));
memcpy(desc.key.data(), deletion_file.path.data(), deletion_file.path.size());
memcpy(desc.key.data() + deletion_file.path.size(), &deletion_file.offset,
sizeof(deletion_file.offset));
desc.path = deletion_file.path;
desc.start_offset = deletion_file.offset;
desc.size = deletion_file.length + 4;
desc.file_size = -1;
return true;
const std::string key_prefix = "paimon_dv:";
desc->key.resize(key_prefix.size() + deletion_file.path.size() + sizeof(deletion_file.offset));
char* key_data = desc->key.data();
memcpy(key_data, key_prefix.data(), key_prefix.size());
key_data += key_prefix.size();
memcpy(key_data, deletion_file.path.data(), deletion_file.path.size());
key_data += deletion_file.path.size();
memcpy(key_data, &deletion_file.offset, sizeof(deletion_file.offset));
desc->path = deletion_file.path;
desc->start_offset = deletion_file.offset;
desc->size = deletion_file.length + 4;
desc->file_size = -1;
desc->format = DeleteFileDesc::Format::PAIMON;
*has_delete_file = true;
return Status::OK();
}

} // namespace doris::paimon
3 changes: 2 additions & 1 deletion be/src/format/reader/table/paimon_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class PaimonReader final : public reader::TableReader {
~PaimonReader() final = default;

protected:
bool _parse_delete_file(const TTableFormatFileDesc& t_desc, DeleteFileDesc& desc) override;
Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc, DeleteFileDesc* desc,
bool* has_delete_file) override;
};

} // namespace doris::paimon
118 changes: 77 additions & 41 deletions be/src/format/reader/table_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>

#include <cstring>
#include <stdexcept>
#include <set>
#include <vector>

#include "common/cast_set.h"
#include "common/status.h"
#include "core/assert_cast.h"
#include "exec/common/endian.h"
#include "exprs/vslot_ref.h"
#include "format/new_parquet/parquet_reader.h"
#include "format/reader/column_mapper.h"
#include "format/table/deletion_vector_reader.h"
#include "io/io_common.h"
#include "roaring/roaring64map.hh"

namespace doris::reader {
namespace {
Expand Down Expand Up @@ -66,10 +71,63 @@ void build_table_filters_from_conjunct(const VExprSPtr& conjunct,
table_filter.conjunct = VExprContext::create_shared(conjunct);
table_filter.slot_ids.assign(slot_ids.begin(), slot_ids.end());
table_filters->push_back(std::move(table_filter));
return;
}
}

Status parse_deletion_vector(const char* buf, size_t buffer_size, DeleteFileDesc::Format format,
DeleteRows* delete_rows) {
DORIS_CHECK(buf != nullptr);
DORIS_CHECK(delete_rows != nullptr);
DORIS_CHECK(format == DeleteFileDesc::Format::PAIMON ||
format == DeleteFileDesc::Format::ICEBERG);

const size_t checksum_size = format == DeleteFileDesc::Format::ICEBERG ? 4 : 0;
if (buffer_size < 8 + checksum_size) [[unlikely]] {
return Status::DataQualityError("Deletion vector file size too small: {}", buffer_size);
}

auto total_length = BigEndian::Load32(buf);
if (total_length + 4 + checksum_size != buffer_size) [[unlikely]] {
return Status::DataQualityError("Deletion vector length mismatch, expected: {}, actual: {}",
total_length + 4 + checksum_size, buffer_size);
}

constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'};
if (memcmp(buf + sizeof(total_length), MAGIC_NUMBER, 4) != 0) [[unlikely]] {
return Status::DataQualityError("Deletion vector magic number mismatch");
}

const char* bitmap_buf = buf + 8;
const size_t bitmap_size = buffer_size - 8 - checksum_size;
if (format == DeleteFileDesc::Format::PAIMON) {
roaring::Roaring bitmap;
try {
bitmap = roaring::Roaring::readSafe(bitmap_buf, bitmap_size);
} catch (const std::runtime_error& e) {
return Status::DataQualityError("Decode roaring bitmap failed, {}", e.what());
}

delete_rows->reserve(bitmap.cardinality());
for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
delete_rows->push_back(*it);
}
return Status::OK();
}

roaring::Roaring64Map bitmap;
try {
bitmap = roaring::Roaring64Map::readSafe(bitmap_buf, bitmap_size);
} catch (const std::runtime_error& e) {
return Status::DataQualityError("Decode roaring bitmap failed, {}", e.what());
}

delete_rows->reserve(bitmap.cardinality());
for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
delete_rows->push_back(cast_set<int64_t>(*it));
}
return Status::OK();
}

} // namespace

std::shared_ptr<io::FileSystemProperties> create_system_properties(
Expand Down Expand Up @@ -117,10 +175,17 @@ Status TableReader::_open_local_filter_exprs(const FileScanRequest& file_request
RowDescriptor row_desc;
for (const auto& expression_filter : file_request.expression_filters) {
if (expression_filter.conjunct == nullptr) {
continue;
if (expression_filter.delete_conjunct == nullptr) {
continue;
}
} else {
RETURN_IF_ERROR(expression_filter.conjunct->prepare(_runtime_state, row_desc));
RETURN_IF_ERROR(expression_filter.conjunct->open(_runtime_state));
}
if (expression_filter.delete_conjunct != nullptr) {
RETURN_IF_ERROR(expression_filter.delete_conjunct->prepare(_runtime_state, row_desc));
RETURN_IF_ERROR(expression_filter.delete_conjunct->open(_runtime_state));
}
RETURN_IF_ERROR(expression_filter.conjunct->prepare(_runtime_state, row_desc));
RETURN_IF_ERROR(expression_filter.conjunct->open(_runtime_state));
}
return Status::OK();
}
Expand Down Expand Up @@ -169,12 +234,17 @@ Status TableReader::prepare_split(const SplitReadOptions& options) {
_partition_values = std::move(options.partition_values);
_current_task = std::make_unique<ScanTask>();
_current_task->data_file = create_file_description(options.current_range);
_delete_rows = nullptr;
return _parse_delete_predicates(options);
}

Status TableReader::_parse_delete_predicates(const SplitReadOptions& options) {
DeleteFileDesc desc {.fs_name = options.current_range.fs_name};
if (_parse_delete_file(options.current_range.table_format_params, desc)) {
bool has_delete_file = false;
RETURN_IF_ERROR(_parse_deletion_vector_file(options.current_range.table_format_params, &desc,
&has_delete_file));
if (has_delete_file) {
DORIS_CHECK(options.cache != nullptr);
Status create_status = Status::OK();

_delete_rows = options.cache->get<DeleteRows>(desc.key, [&]() -> DeleteRows* {
Expand All @@ -195,45 +265,11 @@ Status TableReader::_parse_delete_predicates(const SplitReadOptions& options) {
}

const char* buf = buffer.data();
uint32_t actual_length;
std::memcpy(reinterpret_cast<char*>(&actual_length), buf, 4);
std::reverse(reinterpret_cast<char*>(&actual_length),
reinterpret_cast<char*>(&actual_length) + 4);
buf += 4;
if (actual_length != bytes_read - 4) [[unlikely]] {
create_status = Status::RuntimeError(
"DeletionVector deserialize error: length not match, "
"actual length: {}, expect length: {}",
actual_length, bytes_read - 4);
return nullptr;
}
uint32_t magic_number;
std::memcpy(reinterpret_cast<char*>(&magic_number), buf, 4);
std::reverse(reinterpret_cast<char*>(&magic_number),
reinterpret_cast<char*>(&magic_number) + 4);
buf += 4;
const static uint32_t MAGIC_NUMBER = 1581511376;
if (magic_number != MAGIC_NUMBER) [[unlikely]] {
create_status = Status::RuntimeError(
"DeletionVector deserialize error: invalid magic number {}", magic_number);
return nullptr;
}

roaring::Roaring roaring_bitmap;
SCOPED_TIMER(_profile->parse_delete_file_time);
try {
roaring_bitmap = roaring::Roaring::readSafe(buf, bytes_read - 4);
} catch (const std::runtime_error& e) {
create_status = Status::RuntimeError(
"DeletionVector deserialize error: failed to deserialize roaring bitmap, "
"{}",
e.what());
create_status = parse_deletion_vector(buf, bytes_read, desc.format, delete_rows);
if (!create_status.ok()) [[unlikely]] {
return nullptr;
}
delete_rows->reserve(roaring_bitmap.cardinality());
for (auto it = roaring_bitmap.begin(); it != roaring_bitmap.end(); it++) {
delete_rows->push_back(*it);
}
COUNTER_UPDATE(_profile->num_delete_rows, delete_rows->size());
return delete_rows;
});
Expand Down
Loading
Loading