Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 54 additions & 12 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/serde/data_type_string_serde.h"
#include "vec/exec/format/file_reader/new_plain_binary_line_reader.h"
#include "vec/exec/format/file_reader/new_plain_text_line_reader.h"
#include "vec/exec/scan/scanner.h"
Expand Down Expand Up @@ -355,6 +356,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
block->set_columns(std::move(mutate_columns));
} else {
auto columns = block->mutate_columns();
_nullable_str_col_cache.clear();
while (rows < batch_size && !_line_reader_eof) {
const uint8_t* ptr = nullptr;
size_t size = 0;
Expand Down Expand Up @@ -643,6 +645,32 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block,
std::vector<MutableColumnPtr>& columns, size_t* rows) {
bool is_success = false;

// Initialize cached column pointers on first call per batch to avoid per-row assert_cast.
// Also pre-reserve capacity for offsets/chars/null_map to eliminate realloc during row loop.
if (UNLIKELY(_nullable_str_col_cache.empty())) {
_nullable_str_col_cache.resize(_file_slot_descs.size());
_has_escape_char = (_options.escape_char != 0);
const size_t batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE);
for (int i = 0; i < _file_slot_descs.size(); ++i) {
if (_use_nullable_string_opt[i]) {
IColumn* col_ptr =
_is_load
? columns[i].get()
: const_cast<IColumn*>(block->get_by_position(_file_slot_idx_map[i])
.column.get());
auto& null_col = assert_cast<ColumnNullable&>(*col_ptr);
auto* str_col = assert_cast<ColumnStr<uint32_t>*>(&null_col.get_nested_column());
auto* null_map = &null_col.get_null_map_data();
_nullable_str_col_cache[i].nested_str_col = str_col;
_nullable_str_col_cache[i].null_map = null_map;
// Pre-reserve to avoid repeated realloc inside insert_data/push_back.
str_col->get_offsets().reserve(str_col->get_offsets().size() + batch_size);
str_col->get_chars().reserve(str_col->get_chars().size() + batch_size * 64);
null_map->reserve(null_map->size() + batch_size);
}
}
}

RETURN_IF_ERROR(_line_split_to_values(line, &is_success));
if (UNLIKELY(!is_success)) {
// If not success, which means we met an invalid row, filter this row and return.
Expand All @@ -656,20 +684,34 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block,
? _split_values[col_idx]
: Slice(_options.null_format, _options.null_len);

IColumn* col_ptr = columns[i].get();
if (!_is_load) {
// block is a Block*, and get_by_position returns a ColumnPtr,
// which is a const pointer. Therefore, using const_cast is permissible.
col_ptr = const_cast<IColumn*>(
block->get_by_position(_file_slot_idx_map[i]).column.get());
}

if (_use_nullable_string_opt[i]) {
// For load task, we always read "string" from file.
// So serdes[i] here must be DataTypeNullableSerDe, and DataTypeNullableSerDe -> nested_serde must be DataTypeStringSerDe.
// So we use deserialize_nullable_string and stringSerDe to reduce virtual function calls.
RETURN_IF_ERROR(_deserialize_nullable_string(*col_ptr, value));
// Inline fast path: bypass StringSerDe and per-row assert_cast entirely.
auto& cache = _nullable_str_col_cache[i];
if (_empty_field_as_null && value.size == 0) {
cache.nested_str_col->insert_default();
cache.null_map->push_back(1);
continue;
}
if (_options.null_len > 0 &&
!(_options.converted_from_string && value.trim_double_quotes())) {
if (value.compare(Slice(_options.null_format, _options.null_len)) == 0) {
cache.nested_str_col->insert_default();
cache.null_map->push_back(1);
continue;
}
}
if (UNLIKELY(_has_escape_char)) {
escape_string_for_csv(value.data, &value.size, _options.escape_char,
_options.quote_char);
}
cache.nested_str_col->insert_data(value.data, value.size);
cache.null_map->push_back(0);
} else {
IColumn* col_ptr = columns[i].get();
if (!_is_load) {
col_ptr = const_cast<IColumn*>(
block->get_by_position(_file_slot_idx_map[i]).column.get());
}
RETURN_IF_ERROR(_deserialize_one_cell(_serdes[i], *col_ptr, value));
}
}
Expand Down
10 changes: 10 additions & 0 deletions be/src/vec/exec/format/csv/csv_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
#include "io/file_factory.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "util/slice.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/data_types/data_type.h"
#include "vec/exec/format/file_reader/new_plain_text_line_reader.h"
#include "vec/exec/format/generic_reader.h"
Expand Down Expand Up @@ -283,6 +285,14 @@ class CsvReader : public GenericReader {
// save source text which have been splitted.
std::vector<Slice> _split_values;
std::vector<int> _use_nullable_string_opt;

// Cached column pointers for nullable string fast path, avoiding per-row assert_cast.
struct NullableStringColumnCache {
ColumnStr<uint32_t>* nested_str_col = nullptr;
NullMap* null_map = nullptr;
};
std::vector<NullableStringColumnCache> _nullable_str_col_cache;
bool _has_escape_char = false;
};
} // namespace vectorized
#include "common/compile_check_end.h"
Expand Down
Loading