Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 45 additions & 15 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,23 @@ Status CsvReader::init_reader(bool is_load) {

_is_load = is_load;
if (!_is_load) {
// For query task, we need to save the mapping from table schema to file column
// For query task, there are 2 slot mapping.
// One is from file slot to values in line.
// eg, the file_slot_descs is k1, k3, k5, and values in line are k1, k2, k3, k4, k5
// the _col_idxs will save: 0, 2, 4
// The other is from file slot to columns in output block
// eg, the file_slot_descs is k1, k3, k5, and columns in block are p1, k1, k3, k5
// where "p1" is the partition col which does not exist in file
// the _file_slot_idx_map will save: 1, 2, 3
DCHECK(_params.__isset.column_idxs);
_col_idxs = _params.column_idxs;
int idx = 0;
for (const auto& slot_info : _params.required_slots) {
if (slot_info.is_file_slot) {
_file_slot_idx_map.push_back(idx);
}
idx++;
}
} else {
// For load task, the column order is same as file column order
int i = 0;
Expand All @@ -190,6 +204,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {

const int batch_size = _state->batch_size();
size_t rows = 0;
auto columns = block->mutate_columns();
while (rows < batch_size && !_line_reader_eof) {
const uint8_t* ptr = nullptr;
size_t size = 0;
Expand All @@ -203,7 +218,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
continue;
}

RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, &rows));
RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, columns, &rows));
}

*eof = (rows == 0);
Expand Down Expand Up @@ -303,7 +318,8 @@ Status CsvReader::_create_decompressor() {
return Status::OK();
}

Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, size_t* rows) {
Status CsvReader::_fill_dest_columns(const Slice& line, Block* block,
std::vector<MutableColumnPtr>& columns, size_t* rows) {
bool is_success = false;

RETURN_IF_ERROR(_line_split_to_values(line, &is_success));
Expand All @@ -312,18 +328,32 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, size_t* ro
return Status::OK();
}

// if _split_values.size > _file_slot_descs.size()
// we only take the first few columns
for (int i = 0; i < _file_slot_descs.size(); ++i) {
auto src_slot_desc = _file_slot_descs[i];
int col_idx = _col_idxs[i];
// col idx is out of range, fill with null.
const Slice& value =
col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice;
IColumn* col_ptr =
const_cast<IColumn*>(block->get_by_name(src_slot_desc->col_name()).column.get());
_text_converter->write_vec_column(src_slot_desc, col_ptr, value.data, value.size, true,
false);
if (_is_load) {
for (int i = 0; i < _file_slot_descs.size(); ++i) {
auto src_slot_desc = _file_slot_descs[i];
int col_idx = _col_idxs[i];
// col idx is out of range, fill with null.
const Slice& value =
col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice;
// For load task, we always read "string" from file, so use "write_string_column"
_text_converter->write_string_column(src_slot_desc, &columns[i], value.data,
value.size);
}
} else {
// if _split_values.size > _file_slot_descs.size()
// we only take the first few columns
for (int i = 0; i < _file_slot_descs.size(); ++i) {
auto src_slot_desc = _file_slot_descs[i];
int col_idx = _col_idxs[i];
// col idx is out of range, fill with null.
const Slice& value =
col_idx < _split_values.size() ? _split_values[col_idx] : _s_null_slice;
IColumn* col_ptr = const_cast<IColumn*>(
block->get_by_position(_file_slot_idx_map[i]).column.get());
// For query task, we will convert values to final column type, so use "write_vec_column"
_text_converter->write_vec_column(src_slot_desc, col_ptr, value.data, value.size, true,
false);
}
}
++(*rows);

Expand Down
8 changes: 7 additions & 1 deletion be/src/vec/exec/format/csv/csv_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ class CsvReader : public GenericReader {
private:
// used for stream/broker load of csv file.
Status _create_decompressor();
Status _fill_dest_columns(const Slice& line, Block* block, size_t* rows);
Status _fill_dest_columns(const Slice& line, Block* block,
std::vector<MutableColumnPtr>& columns, size_t* rows);
Status _line_split_to_values(const Slice& line, bool* success);
void _split_line(const Slice& line);
Status _check_array_format(std::vector<Slice>& split_values, bool* is_success);
Expand All @@ -77,6 +78,11 @@ class CsvReader : public GenericReader {
const TFileScanRangeParams& _params;
const TFileRangeDesc& _range;
const std::vector<SlotDescriptor*>& _file_slot_descs;
// Only for query task, save the file slot to columns in block map.
// eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3
// and this 3 columns in block are k2, k3, k1,
// the _file_slot_idx_map will save: 2, 0, 1
std::vector<int> _file_slot_idx_map;
// Only for query task, save the columns' index which need to be read.
// eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3
// and the corresponding position in file is 0, 3, 5.
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/vfile_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class VFileScanner : public VScanner {
std::map<std::string, int> _file_slot_name_map;
// col names from _file_slot_descs
std::vector<std::string> _file_col_names;

// Partition source slot descriptors
std::vector<SlotDescriptor*> _partition_slot_descs;
// Partition slot id to index in _partition_slot_descs
Expand Down