Skip to content

Commit

Permalink
6
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Aug 22, 2023
1 parent 8370e1a commit 3cd8a91
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 25 deletions.
6 changes: 4 additions & 2 deletions be/src/exec/decompressor.cpp
Expand Up @@ -380,9 +380,11 @@ Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t

// Decompress
int uncompressed_len = LZ4_decompress_safe(reinterpret_cast<const char*>(src),
reinterpret_cast<char*>(output), compressed_len, remaining_output_size);
reinterpret_cast<char*>(output), compressed_len,
remaining_output_size);
if (uncompressed_len < 0) {
return Status::InternalError("lz4 block decompress failed. uncompressed_len < 0: {}", uncompressed_len);
return Status::InternalError("lz4 block decompress failed. uncompressed_len < 0: {}",
uncompressed_len);
}

output += uncompressed_len;
Expand Down
48 changes: 25 additions & 23 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Expand Up @@ -387,27 +387,27 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
const int batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE);
size_t rows = 0;

if (_push_down_agg_type == TPushAggOp::type::COUNT) {
while (rows < batch_size && !_line_reader_eof) {
const uint8_t* ptr = nullptr;
size_t size = 0;
RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
if (_skip_lines > 0) {
_skip_lines--;
continue;
}
if (size == 0) {
// Read empty row, just continue
continue;
}
++rows;
}

for (auto& col : block->mutate_columns()) {
col->resize(rows);
}

} else {
if (_push_down_agg_type == TPushAggOp::type::COUNT) {
while (rows < batch_size && !_line_reader_eof) {
const uint8_t* ptr = nullptr;
size_t size = 0;
RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
if (_skip_lines > 0) {
_skip_lines--;
continue;
}
if (size == 0) {
// Read empty row, just continue
continue;
}
++rows;
}

for (auto& col : block->mutate_columns()) {
col->resize(rows);
}

} else {
auto columns = block->mutate_columns();
while (rows < batch_size && !_line_reader_eof) {
const uint8_t* ptr = nullptr;
Expand Down Expand Up @@ -483,7 +483,8 @@ Status CsvReader::_create_decompressor() {
compress_type = CompressType::BZIP2;
break;
case TFileCompressType::LZ4FRAME:
compress_type = config::default_lz4_codec == "block" ? CompressType::LZ4BLOCK : CompressType::LZ4FRAME;
compress_type = config::default_lz4_codec == "block" ? CompressType::LZ4BLOCK
: CompressType::LZ4FRAME;
break;
case TFileCompressType::LZ4BLOCK:
compress_type = CompressType::LZ4BLOCK;
Expand All @@ -508,7 +509,8 @@ Status CsvReader::_create_decompressor() {
compress_type = CompressType::BZIP2;
break;
case TFileFormatType::FORMAT_CSV_LZ4FRAME:
compress_type = config::default_lz4_codec == "block" ? CompressType::LZ4BLOCK : CompressType::LZ4FRAME;
compress_type = config::default_lz4_codec == "block" ? CompressType::LZ4BLOCK
: CompressType::LZ4FRAME;
break;
case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
compress_type = CompressType::LZ4BLOCK;
Expand Down

0 comments on commit 3cd8a91

Please sign in to comment.