Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use read_bytes/total_bytes_to_read for progress bar in s3/file/url/... table functions #51286

Merged
merged 16 commits into from Jul 26, 2023
Merged
9 changes: 5 additions & 4 deletions src/IO/Progress.h
Expand Up @@ -40,9 +40,10 @@ struct ReadProgress
UInt64 read_rows = 0;
UInt64 read_bytes = 0;
UInt64 total_rows_to_read = 0;
UInt64 total_bytes_to_read = 0;

ReadProgress(UInt64 read_rows_, UInt64 read_bytes_, UInt64 total_rows_to_read_ = 0)
: read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_) {}
ReadProgress(UInt64 read_rows_, UInt64 read_bytes_, UInt64 total_rows_to_read_ = 0, UInt64 total_bytes_to_read_ = 0)
: read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_), total_bytes_to_read(total_bytes_to_read_) {}
};

struct WriteProgress
Expand Down Expand Up @@ -98,8 +99,8 @@ struct Progress

Progress() = default;

Progress(UInt64 read_rows_, UInt64 read_bytes_, UInt64 total_rows_to_read_ = 0)
: read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_) {}
Progress(UInt64 read_rows_, UInt64 read_bytes_, UInt64 total_rows_to_read_ = 0, UInt64 total_bytes_to_read_ = 0)
: read_rows(read_rows_), read_bytes(read_bytes_), total_rows_to_read(total_rows_to_read_), total_bytes_to_read(total_bytes_to_read_) {}

explicit Progress(ReadProgress read_progress)
: read_rows(read_progress.read_rows), read_bytes(read_progress.read_bytes), total_rows_to_read(read_progress.total_rows_to_read) {}
Expand Down
2 changes: 1 addition & 1 deletion src/IO/ReadBufferFromFileBase.cpp
Expand Up @@ -42,7 +42,7 @@ void ReadBufferFromFileBase::setProgressCallback(ContextPtr context)

setProfileCallback([file_progress_callback](const ProfileInfo & progress)
{
file_progress_callback(FileProgress(progress.bytes_read, 0));
file_progress_callback(FileProgress(progress.bytes_read));
});
}

Expand Down
3 changes: 3 additions & 0 deletions src/Processors/Executors/ExecutionThreadContext.cpp
Expand Up @@ -56,6 +56,9 @@ static void executeJob(ExecutingGraph::Node * node, ReadProgressCallback * read_
if (read_progress->counters.total_rows_approx)
read_progress_callback->addTotalRowsApprox(read_progress->counters.total_rows_approx);

if (read_progress->counters.total_bytes)
read_progress_callback->addTotalBytes(read_progress->counters.total_bytes);

if (!read_progress_callback->onProgress(read_progress->counters.read_rows, read_progress->counters.read_bytes, read_progress->limits))
node->processor->cancel();
}
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/IRowInputFormat.h
Expand Up @@ -85,7 +85,7 @@ class IRowInputFormat : public IInputFormat
size_t num_errors = 0;

BlockMissingValues block_missing_values;
size_t approx_bytes_read_for_chunk;
size_t approx_bytes_read_for_chunk = 0;
};

}
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/ArrowBlockInputFormat.h
Expand Up @@ -50,7 +50,7 @@ class ArrowBlockInputFormat : public IInputFormat
int record_batch_current = 0;

BlockMissingValues block_missing_values;
size_t approx_bytes_read_for_chunk;
size_t approx_bytes_read_for_chunk = 0;

const FormatSettings format_settings;

Expand Down
Expand Up @@ -67,7 +67,7 @@ class JSONColumnsBlockInputFormatBase : public IInputFormat
Serializations serializations;
std::unique_ptr<JSONColumnsReaderBase> reader;
BlockMissingValues block_missing_values;
size_t approx_bytes_read_for_chunk;
size_t approx_bytes_read_for_chunk = 0;
};


Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/NativeFormat.cpp
Expand Up @@ -66,7 +66,7 @@ class NativeInputFormat final : public IInputFormat
std::unique_ptr<NativeReader> reader;
Block header;
BlockMissingValues block_missing_values;
size_t approx_bytes_read_for_chunk;
size_t approx_bytes_read_for_chunk = 0;
};

class NativeOutputFormat final : public IOutputFormat
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/ORCBlockInputFormat.h
Expand Up @@ -52,7 +52,7 @@ class ORCBlockInputFormat : public IInputFormat
std::vector<int> include_indices;

BlockMissingValues block_missing_values;
size_t approx_bytes_read_for_chunk;
size_t approx_bytes_read_for_chunk = 0;

const FormatSettings format_settings;
const std::unordered_set<int> & skip_stripes;
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/ParallelParsingInputFormat.h
Expand Up @@ -202,7 +202,7 @@ class ParallelParsingInputFormat : public IInputFormat
const size_t max_block_size;

BlockMissingValues last_block_missing_values;
size_t last_approx_bytes_read_for_chunk;
size_t last_approx_bytes_read_for_chunk = 0;

/// Non-atomic because it is used in one thread.
std::optional<size_t> next_block_in_current_unit;
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/ParquetBlockInputFormat.h
Expand Up @@ -273,7 +273,7 @@ class ParquetBlockInputFormat : public IInputFormat
std::unique_ptr<ThreadPool> pool;

BlockMissingValues previous_block_missing_values;
size_t previous_approx_bytes_read_for_chunk;
size_t previous_approx_bytes_read_for_chunk = 0;

std::exception_ptr background_exception = nullptr;
std::atomic<int> is_stopped{0};
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/ValuesBlockInputFormat.h
Expand Up @@ -96,7 +96,7 @@ class ValuesBlockInputFormat final : public IInputFormat
Serializations serializations;

BlockMissingValues block_missing_values;
size_t approx_bytes_read_for_chunk;
size_t approx_bytes_read_for_chunk = 0;
};

class ValuesSchemaReader : public IRowSchemaReader
Expand Down
1 change: 1 addition & 0 deletions src/Processors/IProcessor.h
Expand Up @@ -343,6 +343,7 @@ class IProcessor
uint64_t read_rows = 0;
uint64_t read_bytes = 0;
uint64_t total_rows_approx = 0;
uint64_t total_bytes = 0;
};

struct ReadProgress
Expand Down
1 change: 1 addition & 0 deletions src/Processors/ISource.h
Expand Up @@ -43,6 +43,7 @@ class ISource : public IProcessor
std::optional<ReadProgress> getReadProgress() final;

void addTotalRowsApprox(size_t value) { read_progress.total_rows_approx += value; }
void addTotalBytes(size_t value) { read_progress.total_bytes += value; }
};

using SourcePtr = std::shared_ptr<ISource>;
Expand Down
2 changes: 2 additions & 0 deletions src/Processors/Sources/RemoteSource.cpp
Expand Up @@ -77,6 +77,8 @@ std::optional<Chunk> RemoteSource::tryGenerate()
{
if (value.total_rows_to_read)
addTotalRowsApprox(value.total_rows_to_read);
if (value.total_bytes_to_read)
addTotalBytes(value.total_bytes_to_read);
progress(value.read_rows, value.read_bytes);
});

Expand Down
12 changes: 12 additions & 0 deletions src/QueryPipeline/ReadProgressCallback.cpp
Expand Up @@ -63,6 +63,18 @@ bool ReadProgressCallback::onProgress(uint64_t read_rows, uint64_t read_bytes, c
process_list_elem->updateProgressIn(total_rows_progress);
}

size_t bytes = 0;
if ((bytes = total_bytes.exchange(0)) != 0)
{
Progress total_bytes_progress = {0, 0, 0, bytes};

if (progress_callback)
progress_callback(total_bytes_progress);

if (process_list_elem)
process_list_elem->updateProgressIn(total_bytes_progress);
}

Progress value {read_rows, read_bytes};

if (progress_callback)
Expand Down
3 changes: 3 additions & 0 deletions src/QueryPipeline/ReadProgressCallback.h
Expand Up @@ -23,6 +23,7 @@ class ReadProgressCallback
void setProcessListElement(QueryStatusPtr elem);
void setProgressCallback(const ProgressCallback & callback) { progress_callback = callback; }
void addTotalRowsApprox(size_t value) { total_rows_approx += value; }
void addTotalBytes(size_t value) { total_bytes += value; }

/// Skip updating profile events.
/// For merges in mutations it may need special logic, it's done inside ProgressCallback.
Expand All @@ -37,6 +38,8 @@ class ReadProgressCallback

/// The approximate total number of rows to read. For progress bar.
std::atomic_size_t total_rows_approx = 0;
/// The total number of bytes to read. For progress bar.
std::atomic_size_t total_bytes = 0;

std::mutex limits_and_quotas_mutex;
Stopwatch total_stopwatch{CLOCK_MONOTONIC_COARSE}; /// Including waiting time
Expand Down
18 changes: 17 additions & 1 deletion src/Storages/HDFS/HDFSCommon.h
Expand Up @@ -57,7 +57,23 @@ static const String CONFIG_PREFIX;
~HDFSBuilderWrapper() { hdfsFreeBuilder(hdfs_builder); }

HDFSBuilderWrapper(const HDFSBuilderWrapper &) = delete;
HDFSBuilderWrapper(HDFSBuilderWrapper &&) = default;
HDFSBuilderWrapper & operator=(const HDFSBuilderWrapper &) = delete;

HDFSBuilderWrapper(HDFSBuilderWrapper && other) noexcept
{
*this = std::move(other);
}

HDFSBuilderWrapper & operator=(HDFSBuilderWrapper && other) noexcept
{
std::swap(hdfs_builder, other.hdfs_builder);
config_stor = std::move(other.config_stor);
hadoop_kerberos_keytab = std::move(other.hadoop_kerberos_keytab);
hadoop_kerberos_principal = std::move(other.hadoop_kerberos_principal);
hadoop_security_kerberos_ticket_cache_path = std::move(other.hadoop_security_kerberos_ticket_cache_path);
need_kinit = std::move(other.need_kinit);
return *this;
}

hdfsBuilder * get() { return hdfs_builder; }

Expand Down
20 changes: 15 additions & 5 deletions src/Storages/HDFS/ReadBufferFromHDFS.cpp
Expand Up @@ -3,6 +3,7 @@
#if USE_HDFS
#include <Storages/HDFS/HDFSCommon.h>
#include <IO/ResourceGuard.h>
#include <IO/Progress.h>
#include <Common/Throttler.h>
#include <Common/safe_cast.h>
#include <hdfs/hdfs.h>
Expand Down Expand Up @@ -42,19 +43,23 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
off_t file_offset = 0;
off_t read_until_position = 0;

std::optional<size_t> file_size;

explicit ReadBufferFromHDFSImpl(
const std::string & hdfs_uri_,
const std::string & hdfs_file_path_,
const Poco::Util::AbstractConfiguration & config_,
const ReadSettings & read_settings_,
size_t read_until_position_,
bool use_external_buffer_)
bool use_external_buffer_,
std::optional<size_t> file_size_)
: BufferWithOwnMemory<SeekableReadBuffer>(use_external_buffer_ ? 0 : read_settings_.remote_fs_buffer_size)
, hdfs_uri(hdfs_uri_)
, hdfs_file_path(hdfs_file_path_)
, builder(createHDFSBuilder(hdfs_uri_, config_))
, read_settings(read_settings_)
, read_until_position(read_until_position_)
, file_size(file_size_)
{
fs = createHDFSFS(builder.get());
fin = hdfsOpenFile(fs.get(), hdfs_file_path.c_str(), O_RDONLY, 0, 0, 0);
Expand All @@ -70,12 +75,16 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
hdfsCloseFile(fs.get(), fin);
}

size_t getFileSize() const
size_t getFileSize()
{
if (file_size)
return *file_size;

auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str());
if (!file_info)
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", hdfs_file_path);
return file_info->mSize;
file_size = static_cast<size_t>(file_info->mSize);
return *file_size;
}

bool nextImpl() override
Expand Down Expand Up @@ -156,10 +165,11 @@ ReadBufferFromHDFS::ReadBufferFromHDFS(
const Poco::Util::AbstractConfiguration & config_,
const ReadSettings & read_settings_,
size_t read_until_position_,
bool use_external_buffer_)
bool use_external_buffer_,
std::optional<size_t> file_size_)
: ReadBufferFromFileBase(read_settings_.remote_fs_buffer_size, nullptr, 0)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(
hdfs_uri_, hdfs_file_path_, config_, read_settings_, read_until_position_, use_external_buffer_))
hdfs_uri_, hdfs_file_path_, config_, read_settings_, read_until_position_, use_external_buffer_, file_size_))
, use_external_buffer(use_external_buffer_)
{
}
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/HDFS/ReadBufferFromHDFS.h
Expand Up @@ -29,7 +29,8 @@ struct ReadBufferFromHDFSImpl;
const Poco::Util::AbstractConfiguration & config_,
const ReadSettings & read_settings_,
size_t read_until_position_ = 0,
bool use_external_buffer = false);
bool use_external_buffer = false,
std::optional<size_t> file_size = std::nullopt);

~ReadBufferFromHDFS() override;

Expand Down