Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read Parquet files faster #47964

Merged
merged 7 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@
M(RestartReplicaThreadsActive, "Number of threads in the RESTART REPLICA thread pool running a task.") \
M(QueryPipelineExecutorThreads, "Number of threads in the PipelineExecutor thread pool.") \
M(QueryPipelineExecutorThreadsActive, "Number of threads in the PipelineExecutor thread pool running a task.") \
M(ParquetDecoderThreads, "Number of threads in the ParquetBlockInputFormat thread pool running a task.") \
M(ParquetDecoderThreadsActive, "Number of threads in the ParquetBlockInputFormat thread pool.") \
M(DistributedFilesToInsert, "Number of pending files to process for asynchronous insertion into Distributed tables. Number of files for every shard is summed.") \
M(BrokenDistributedFilesToInsert, "Number of files for asynchronous insertion into Distributed tables that has been marked as broken. This metric will starts from 0 on start. Number of files for every shard is summed.") \
M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \
Expand Down
4 changes: 2 additions & 2 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
M(DiskReadElapsedMicroseconds, "Total time spent waiting for read syscall. This include reads from page cache.") \
M(DiskWriteElapsedMicroseconds, "Total time spent waiting for write syscall. This include writes to page cache.") \
M(NetworkReceiveElapsedMicroseconds, "Total time spent waiting for data to receive or receiving data from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
M(NetworkSendElapsedMicroseconds, "Total time spent waiting for data to send to network or sending data to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries..") \
M(NetworkSendElapsedMicroseconds, "Total time spent waiting for data to send to network or sending data to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
M(NetworkReceiveBytes, "Total number of bytes received from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
M(NetworkSendBytes, "Total number of bytes send to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \
\
Expand Down Expand Up @@ -249,7 +249,7 @@ The server successfully detected this situation and will download merged part fr
M(RWLockWritersWaitMilliseconds, "Total time spent waiting for a write lock to be acquired (in a heavy RWLock).") \
M(DNSError, "Total count of errors in DNS resolution") \
\
M(RealTimeMicroseconds, "Total (wall clock) time spent in processing (queries and other tasks) threads (not that this is a sum).") \
M(RealTimeMicroseconds, "Total (wall clock) time spent in processing (queries and other tasks) threads (note that this is a sum).") \
M(UserTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in user space. This include time CPU pipeline was stalled due to cache misses, branch mispredictions, hyper-threading, etc.") \
M(SystemTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in OS kernel space. This include time CPU pipeline was stalled due to cache misses, branch mispredictions, hyper-threading, etc.") \
M(MemoryOvercommitWaitTimeMicroseconds, "Total time spent in waiting for memory to be freed in OvercommitTracker.") \
Expand Down
46 changes: 0 additions & 46 deletions src/Common/RangeGenerator.h

This file was deleted.

2 changes: 1 addition & 1 deletion src/Common/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true>;
/// one is at GlobalThreadPool level, the other is at ThreadPool level, so tracing context will be initialized on the same thread twice.
///
/// Once the worker on ThreadPool gains the control of execution, it won't return until it's shutdown,
/// which means the tracing context initialized at underlying worker level won't be delete for a very long time.
/// which means the tracing context initialized at underlying worker level won't be deleted for a very long time.
/// This would cause wrong context for further jobs scheduled in ThreadPool.
///
/// To make sure the tracing context is correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level.
Expand Down
22 changes: 0 additions & 22 deletions src/Common/tests/gtest_range_generator.cpp

This file was deleted.

25 changes: 22 additions & 3 deletions src/Core/BackgroundSchedulePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
namespace DB
{

namespace ErrorCodes { extern const int CANNOT_SCHEDULE_TASK; }


BackgroundSchedulePoolTaskInfo::BackgroundSchedulePoolTaskInfo(
BackgroundSchedulePool & pool_, const std::string & log_name_, const BackgroundSchedulePool::TaskFunc & function_)
: pool(pool_), log_name(log_name_), function(function_)
Expand Down Expand Up @@ -158,10 +161,26 @@ BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Met
LOG_INFO(&Poco::Logger::get("BackgroundSchedulePool/" + thread_name), "Create BackgroundSchedulePool with {} threads", size_);

threads.resize(size_);
for (auto & thread : threads)
thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); });

delayed_thread = std::make_unique<ThreadFromGlobalPoolNoTracingContextPropagation>([this] { delayExecutionThreadFunction(); });
try
{
for (auto & thread : threads)
thread = ThreadFromGlobalPoolNoTracingContextPropagation([this] { threadFunction(); });

delayed_thread = std::make_unique<ThreadFromGlobalPoolNoTracingContextPropagation>([this] { delayExecutionThreadFunction(); });
}
catch (...)
{
LOG_FATAL(
&Poco::Logger::get("BackgroundSchedulePool/" + thread_name),
"Couldn't get {} threads from global thread pool: {}",
size_,
getCurrentExceptionCode() == DB::ErrorCodes::CANNOT_SCHEDULE_TASK
? "Not enough threads. Please make sure max_thread_pool_size is considerably "
"bigger than background_schedule_pool_size."
: getCurrentExceptionMessage(/* with_stacktrace */ true));
abort();
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This is unrelated to the rest of the PR. Without this, the server gives you a quiet SIGABRT if you set max_thread_pool_size too small. Now it'll log an error.)

}


Expand Down
4 changes: 3 additions & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ class IColumn;
M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \
M(UInt64, merge_tree_min_rows_for_concurrent_read_for_remote_filesystem, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized, when reading from remote filesystem.", 0) \
M(UInt64, merge_tree_min_bytes_for_concurrent_read_for_remote_filesystem, (24 * 10 * 1024 * 1024), "If at least as many bytes are read from one file, the reading can be parallelized, when reading from remote filesystem.", 0) \
M(UInt64, remote_read_min_bytes_for_seek, 4 * DBMS_DEFAULT_BUFFER_SIZE, "Min bytes required for remote read (url, s3) to do seek, instead for read with ignore.", 0) \
M(UInt64, remote_read_min_bytes_for_seek, 4 * DBMS_DEFAULT_BUFFER_SIZE, "Min bytes required for remote read (url, s3) to do seek, instead of read with ignore.", 0) \
\
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. Makes sense only for inserts via HTTP protocol. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
Expand Down Expand Up @@ -812,6 +812,8 @@ class IColumn;
M(Bool, input_format_orc_case_insensitive_column_matching, false, "Ignore case when matching ORC columns with CH columns.", 0) \
M(Bool, input_format_parquet_import_nested, false, "Allow to insert array of structs into Nested table in Parquet input format.", 0) \
M(Bool, input_format_parquet_case_insensitive_column_matching, false, "Ignore case when matching Parquet columns with CH columns.", 0) \
/* TODO: Consider unifying this with https://github.com/ClickHouse/ClickHouse/issues/38755 */ \
M(Bool, input_format_parquet_preserve_order, true, "Avoid reordering rows when reading from Parquet files. Usually makes it much slower.", 0) \
M(Bool, input_format_allow_seeks, true, "Allow seeks while reading in ORC/Parquet/Arrow input formats", 0) \
M(Bool, input_format_orc_allow_missing_columns, false, "Allow missing columns while reading ORC input formats", 0) \
M(Bool, input_format_parquet_allow_missing_columns, false, "Allow missing columns while reading Parquet input formats", 0) \
Expand Down
4 changes: 0 additions & 4 deletions src/Dictionaries/HTTPDictionarySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ QueryPipeline HTTPDictionarySource::loadAll()
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries,
ReadWriteBufferFromHTTP::Range{},
nullptr, false);

return createWrappedBuffer(std::move(in_ptr));
Expand All @@ -120,7 +119,6 @@ QueryPipeline HTTPDictionarySource::loadUpdatedAll()
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries,
ReadWriteBufferFromHTTP::Range{},
nullptr, false);

return createWrappedBuffer(std::move(in_ptr));
Expand Down Expand Up @@ -150,7 +148,6 @@ QueryPipeline HTTPDictionarySource::loadIds(const std::vector<UInt64> & ids)
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries,
ReadWriteBufferFromHTTP::Range{},
nullptr, false);

return createWrappedBuffer(std::move(in_ptr));
Expand Down Expand Up @@ -180,7 +177,6 @@ QueryPipeline HTTPDictionarySource::loadKeys(const Columns & key_columns, const
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries,
ReadWriteBufferFromHTTP::Range{},
nullptr, false);

return createWrappedBuffer(std::move(in_ptr));
Expand Down
7 changes: 1 addition & 6 deletions src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,7 @@ void AsynchronousReadIndirectBufferFromRemoteFS::prefetch(int64_t priority)

void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t position)
{
/// Do not reinitialize internal state in case the new end of range is already included.
/// Actually it is likely that we will anyway reinitialize it as seek method is called after
/// changing end position, but seek avoiding feature might help to avoid reinitialization,
/// so this check is useful to save the prefetch for the time when we try to avoid seek by
/// reading and ignoring some data.
if (!read_until_position || position > *read_until_position)
if (!read_until_position || position != *read_until_position)
kssenii marked this conversation as resolved.
Show resolved Hide resolved
{
read_until_position = position;

Expand Down