Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #51088 to 23.5: Improve progress bar for file/s3/hdfs/url table functions. Step 1 #51235

Merged
merged 2 commits into from Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/IO/WithFileSize.cpp
Expand Up @@ -3,6 +3,7 @@
#include <IO/CompressedReadBufferWrapper.h>
#include <IO/ParallelReadBuffer.h>
#include <IO/ReadBufferFromFileDecorator.h>
#include <IO/PeekableReadBuffer.h>

namespace DB
{
Expand Down Expand Up @@ -37,6 +38,18 @@ size_t getFileSizeFromReadBuffer(ReadBuffer & in)
return getFileSize(in);
}

std::optional<size_t> tryGetFileSizeFromReadBuffer(ReadBuffer & in)
{
try
{
return getFileSizeFromReadBuffer(in);
}
catch (...)
{
return std::nullopt;
}
}

bool isBufferWithFileSize(const ReadBuffer & in)
{
if (const auto * delegate = dynamic_cast<const ReadBufferFromFileDecorator *>(&in))
Expand All @@ -51,4 +64,23 @@ bool isBufferWithFileSize(const ReadBuffer & in)
return dynamic_cast<const WithFileSize *>(&in) != nullptr;
}

size_t getDataOffsetMaybeCompressed(const ReadBuffer & in)
{
if (const auto * delegate = dynamic_cast<const ReadBufferFromFileDecorator *>(&in))
{
return getDataOffsetMaybeCompressed(delegate->getWrappedReadBuffer());
}
else if (const auto * compressed = dynamic_cast<const CompressedReadBufferWrapper *>(&in))
{
return getDataOffsetMaybeCompressed(compressed->getWrappedReadBuffer());
}
else if (const auto * peekable = dynamic_cast<const PeekableReadBuffer *>(&in))
{
return getDataOffsetMaybeCompressed(peekable->getSubBuffer());
}

return in.count();
}


}
5 changes: 5 additions & 0 deletions src/IO/WithFileSize.h
Expand Up @@ -18,4 +18,9 @@ bool isBufferWithFileSize(const ReadBuffer & in);

size_t getFileSizeFromReadBuffer(ReadBuffer & in);

/// Return nullopt if couldn't find out file size;
std::optional<size_t> tryGetFileSizeFromReadBuffer(ReadBuffer & in);

size_t getDataOffsetMaybeCompressed(const ReadBuffer & in);

}
2 changes: 2 additions & 0 deletions src/Processors/Formats/IInputFormat.h
Expand Up @@ -53,6 +53,8 @@ class IInputFormat : public ISource

void setErrorsLogger(const InputFormatErrorsLoggerPtr & errors_logger_) { errors_logger = errors_logger_; }

virtual size_t getApproxBytesReadForChunk() const { return 0; }

protected:
ColumnMappingPtr column_mapping{};

Expand Down
2 changes: 2 additions & 0 deletions src/Processors/Formats/IRowInputFormat.cpp
Expand Up @@ -96,6 +96,7 @@ Chunk IRowInputFormat::generate()
block_missing_values.clear();

size_t num_rows = 0;
size_t chunk_start_offset = getDataOffsetMaybeCompressed(getReadBuffer());

try
{
Expand Down Expand Up @@ -242,6 +243,7 @@ Chunk IRowInputFormat::generate()
column->finalize();

Chunk chunk(std::move(columns), num_rows);
approx_bytes_read_for_chunk = getDataOffsetMaybeCompressed(getReadBuffer()) - chunk_start_offset;
return chunk;
}

Expand Down
3 changes: 3 additions & 0 deletions src/Processors/Formats/IRowInputFormat.h
Expand Up @@ -74,6 +74,8 @@ class IRowInputFormat : public IInputFormat

size_t getTotalRows() const { return total_rows; }

size_t getApproxBytesReadForChunk() const override { return approx_bytes_read_for_chunk; }

Serializations serializations;

private:
Expand All @@ -83,6 +85,7 @@ class IRowInputFormat : public IInputFormat
size_t num_errors = 0;

BlockMissingValues block_missing_values;
size_t approx_bytes_read_for_chunk;
};

}
7 changes: 6 additions & 1 deletion src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp
Expand Up @@ -33,7 +33,7 @@ Chunk ArrowBlockInputFormat::generate()
Chunk res;
block_missing_values.clear();
arrow::Result<std::shared_ptr<arrow::RecordBatch>> batch_result;

size_t batch_start = getDataOffsetMaybeCompressed(*in);
if (stream)
{
if (!stream_reader)
Expand Down Expand Up @@ -76,6 +76,11 @@ Chunk ArrowBlockInputFormat::generate()
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr;
arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result, (*table_result)->num_rows(), block_missing_values_ptr);

/// There is no easy way to get original record batch size from Arrow metadata.
/// Let's just use the number of bytes read from read buffer.
auto batch_end = getDataOffsetMaybeCompressed(*in);
if (batch_end > batch_start)
approx_bytes_read_for_chunk = batch_end - batch_start;
return res;
}

Expand Down
3 changes: 3 additions & 0 deletions src/Processors/Formats/Impl/ArrowBlockInputFormat.h
Expand Up @@ -27,6 +27,8 @@ class ArrowBlockInputFormat : public IInputFormat

const BlockMissingValues & getMissingValues() const override;

size_t getApproxBytesReadForChunk() const override { return approx_bytes_read_for_chunk; }

private:
Chunk generate() override;

Expand All @@ -48,6 +50,7 @@ class ArrowBlockInputFormat : public IInputFormat
int record_batch_current = 0;

BlockMissingValues block_missing_values;
size_t approx_bytes_read_for_chunk;

const FormatSettings format_settings;

Expand Down
Expand Up @@ -117,6 +117,7 @@ Chunk JSONColumnsBlockInputFormatBase::generate()
if (reader->checkChunkEnd())
return Chunk(std::move(columns), 0);

size_t chunk_start = getDataOffsetMaybeCompressed(*in);
std::vector<UInt8> seen_columns(columns.size(), 0);
Int64 rows = -1;
size_t iteration = 0;
Expand Down Expand Up @@ -151,6 +152,8 @@ Chunk JSONColumnsBlockInputFormatBase::generate()
}
while (!reader->checkChunkEndOrSkipColumnDelimiter());

approx_bytes_read_for_chunk = getDataOffsetMaybeCompressed(*in) - chunk_start;

if (rows <= 0)
return Chunk(std::move(columns), 0);

Expand Down
3 changes: 3 additions & 0 deletions src/Processors/Formats/Impl/JSONColumnsBlockInputFormatBase.h
Expand Up @@ -53,6 +53,8 @@ class JSONColumnsBlockInputFormatBase : public IInputFormat

const BlockMissingValues & getMissingValues() const override { return block_missing_values; }

size_t getApproxBytesReadForChunk() const override { return approx_bytes_read_for_chunk; }

protected:
Chunk generate() override;

Expand All @@ -65,6 +67,7 @@ class JSONColumnsBlockInputFormatBase : public IInputFormat
Serializations serializations;
std::unique_ptr<JSONColumnsReaderBase> reader;
BlockMissingValues block_missing_values;
size_t approx_bytes_read_for_chunk;
};


Expand Down
6 changes: 6 additions & 0 deletions src/Processors/Formats/Impl/NativeFormat.cpp
Expand Up @@ -38,7 +38,10 @@ class NativeInputFormat final : public IInputFormat
Chunk generate() override
{
block_missing_values.clear();
size_t block_start = getDataOffsetMaybeCompressed(*in);
auto block = reader->read();
approx_bytes_read_for_chunk = getDataOffsetMaybeCompressed(*in) - block_start;

if (!block)
return {};

Expand All @@ -57,10 +60,13 @@ class NativeInputFormat final : public IInputFormat

const BlockMissingValues & getMissingValues() const override { return block_missing_values; }

size_t getApproxBytesReadForChunk() const override { return approx_bytes_read_for_chunk; }

private:
std::unique_ptr<NativeReader> reader;
Block header;
BlockMissingValues block_missing_values;
size_t approx_bytes_read_for_chunk;
};

class NativeOutputFormat final : public IOutputFormat
Expand Down
1 change: 1 addition & 0 deletions src/Processors/Formats/Impl/ORCBlockInputFormat.cpp
Expand Up @@ -64,6 +64,7 @@ Chunk ORCBlockInputFormat::generate()
if (!table || !num_rows)
return {};

approx_bytes_read_for_chunk = file_reader->GetRawORCReader()->getStripe(stripe_current)->getDataLength();
++stripe_current;

Chunk res;
Expand Down
3 changes: 3 additions & 0 deletions src/Processors/Formats/Impl/ORCBlockInputFormat.h
Expand Up @@ -29,6 +29,8 @@ class ORCBlockInputFormat : public IInputFormat

const BlockMissingValues & getMissingValues() const override;

size_t getApproxBytesReadForChunk() const override { return approx_bytes_read_for_chunk; }

protected:
Chunk generate() override;

Expand All @@ -50,6 +52,7 @@ class ORCBlockInputFormat : public IInputFormat
std::vector<int> include_indices;

BlockMissingValues block_missing_values;
size_t approx_bytes_read_for_chunk;

const FormatSettings format_settings;
const std::unordered_set<int> & skip_stripes;
Expand Down
8 changes: 8 additions & 0 deletions src/Processors/Formats/Impl/ParallelParsingInputFormat.cpp
Expand Up @@ -39,8 +39,10 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupPtr thread
// Segmentating the original input.
unit.segment.resize(0);

size_t segment_start = getDataOffsetMaybeCompressed(*in);
auto [have_more_data, currently_read_rows] = file_segmentation_engine(*in, unit.segment, min_chunk_bytes, max_block_size);

unit.original_segment_size = getDataOffsetMaybeCompressed(*in) - segment_start;
unit.offset = successfully_read_rows_count;
successfully_read_rows_count += currently_read_rows;

Expand Down Expand Up @@ -108,6 +110,11 @@ void ParallelParsingInputFormat::parserThreadFunction(ThreadGroupPtr thread_grou
/// NOLINTNEXTLINE(bugprone-use-after-move, hicpp-invalid-access-moved)
unit.chunk_ext.chunk.emplace_back(std::move(chunk));
unit.chunk_ext.block_missing_values.emplace_back(parser.getMissingValues());
size_t approx_chunk_size = input_format->getApproxBytesReadForChunk();
/// We could decompress data during file segmentation.
/// Correct chunk size using original segment size.
approx_chunk_size = static_cast<size_t>(std::ceil(static_cast<double>(approx_chunk_size) / unit.segment.size() * unit.original_segment_size));
unit.chunk_ext.approx_chunk_sizes.push_back(approx_chunk_size);
}

/// Extract column_mapping from first parser to propagate it to others
Expand Down Expand Up @@ -237,6 +244,7 @@ Chunk ParallelParsingInputFormat::generate()

Chunk res = std::move(unit.chunk_ext.chunk.at(*next_block_in_current_unit));
last_block_missing_values = std::move(unit.chunk_ext.block_missing_values[*next_block_in_current_unit]);
last_approx_bytes_read_for_chunk = unit.chunk_ext.approx_chunk_sizes.at(*next_block_in_current_unit);

next_block_in_current_unit.value() += 1;

Expand Down
5 changes: 5 additions & 0 deletions src/Processors/Formats/Impl/ParallelParsingInputFormat.h
Expand Up @@ -126,6 +126,8 @@ class ParallelParsingInputFormat : public IInputFormat
return last_block_missing_values;
}

size_t getApproxBytesReadForChunk() const override { return last_approx_bytes_read_for_chunk; }

String getName() const override final { return "ParallelParsingBlockInputFormat"; }

private:
Expand Down Expand Up @@ -200,6 +202,7 @@ class ParallelParsingInputFormat : public IInputFormat
const size_t max_block_size;

BlockMissingValues last_block_missing_values;
size_t last_approx_bytes_read_for_chunk;

/// Non-atomic because it is used in one thread.
std::optional<size_t> next_block_in_current_unit;
Expand Down Expand Up @@ -245,6 +248,7 @@ class ParallelParsingInputFormat : public IInputFormat
{
std::vector<Chunk> chunk;
std::vector<BlockMissingValues> block_missing_values;
std::vector<size_t> approx_chunk_sizes;
};

struct ProcessingUnit
Expand All @@ -256,6 +260,7 @@ class ParallelParsingInputFormat : public IInputFormat

ChunkExt chunk_ext;
Memory<> segment;
size_t original_segment_size;
std::atomic<ProcessingUnitStatus> status;
/// Needed for better exception message.
size_t offset = 0;
Expand Down
7 changes: 6 additions & 1 deletion src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
Expand Up @@ -147,6 +147,9 @@ void ParquetBlockInputFormat::initializeRowGroupReader(size_t row_group_idx)
format_settings.parquet.allow_missing_columns,
format_settings.null_as_default,
format_settings.parquet.case_insensitive_column_matching);

row_group.row_group_bytes_uncompressed = metadata->RowGroup(static_cast<int>(row_group_idx))->total_compressed_size();
row_group.row_group_rows = metadata->RowGroup(static_cast<int>(row_group_idx))->num_rows();
}

void ParquetBlockInputFormat::scheduleRowGroup(size_t row_group_idx)
Expand Down Expand Up @@ -253,7 +256,8 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_idx, std::unique_l

auto tmp_table = arrow::Table::FromRecordBatches({*batch});

PendingChunk res = {.chunk_idx = row_group.next_chunk_idx, .row_group_idx = row_group_idx};
size_t approx_chunk_original_size = static_cast<size_t>(std::ceil(static_cast<double>(row_group.row_group_bytes_uncompressed) / row_group.row_group_rows * (*tmp_table)->num_rows()));
PendingChunk res = {.chunk_idx = row_group.next_chunk_idx, .row_group_idx = row_group_idx, .approx_original_chunk_size = approx_chunk_original_size};

/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
Expand Down Expand Up @@ -327,6 +331,7 @@ Chunk ParquetBlockInputFormat::generate()
scheduleMoreWorkIfNeeded(chunk.row_group_idx);

previous_block_missing_values = std::move(chunk.block_missing_values);
previous_approx_bytes_read_for_chunk = chunk.approx_original_chunk_size;
return std::move(chunk.chunk);
}

Expand Down
7 changes: 7 additions & 0 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.h
Expand Up @@ -60,6 +60,8 @@ class ParquetBlockInputFormat : public IInputFormat

const BlockMissingValues & getMissingValues() const override;

size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; }

private:
Chunk generate() override;

Expand Down Expand Up @@ -200,6 +202,9 @@ class ParquetBlockInputFormat : public IInputFormat
size_t next_chunk_idx = 0;
size_t num_pending_chunks = 0;

size_t row_group_bytes_uncompressed = 0;
size_t row_group_rows = 0;

// These are only used by the decoding thread, so don't require locking the mutex.
std::unique_ptr<parquet::arrow::FileReader> file_reader;
std::shared_ptr<arrow::RecordBatchReader> record_batch_reader;
Expand All @@ -213,6 +218,7 @@ class ParquetBlockInputFormat : public IInputFormat
BlockMissingValues block_missing_values;
size_t chunk_idx; // within row group
size_t row_group_idx;
size_t approx_original_chunk_size;

// For priority_queue.
// In ordered mode we deliver strictly in order of increasing row group idx,
Expand Down Expand Up @@ -267,6 +273,7 @@ class ParquetBlockInputFormat : public IInputFormat
std::unique_ptr<ThreadPool> pool;

BlockMissingValues previous_block_missing_values;
size_t previous_approx_bytes_read_for_chunk;

std::exception_ptr background_exception = nullptr;
std::atomic<int> is_stopped{0};
Expand Down
3 changes: 3 additions & 0 deletions src/Processors/Formats/Impl/ValuesBlockInputFormat.cpp
Expand Up @@ -61,6 +61,7 @@ Chunk ValuesBlockInputFormat::generate()
const Block & header = getPort().getHeader();
MutableColumns columns = header.cloneEmptyColumns();
block_missing_values.clear();
size_t chunk_start = getDataOffsetMaybeCompressed(*buf);

for (size_t rows_in_block = 0; rows_in_block < params.max_block_size; ++rows_in_block)
{
Expand All @@ -79,6 +80,8 @@ Chunk ValuesBlockInputFormat::generate()
}
}

approx_bytes_read_for_chunk = getDataOffsetMaybeCompressed(*buf) - chunk_start;

/// Evaluate expressions, which were parsed using templates, if any
for (size_t i = 0; i < columns.size(); ++i)
{
Expand Down
2 changes: 2 additions & 0 deletions src/Processors/Formats/Impl/ValuesBlockInputFormat.h
Expand Up @@ -40,6 +40,7 @@ class ValuesBlockInputFormat final : public IInputFormat

const BlockMissingValues & getMissingValues() const override { return block_missing_values; }

size_t getApproxBytesReadForChunk() const override { return approx_bytes_read_for_chunk; }
private:
ValuesBlockInputFormat(std::unique_ptr<PeekableReadBuffer> buf_, const Block & header_, const RowInputFormatParams & params_,
const FormatSettings & format_settings_);
Expand Down Expand Up @@ -95,6 +96,7 @@ class ValuesBlockInputFormat final : public IInputFormat
Serializations serializations;

BlockMissingValues block_missing_values;
size_t approx_bytes_read_for_chunk;
};

class ValuesSchemaReader : public IRowSchemaReader
Expand Down