Skip to content

Move checkpointing parallelism into TaskExecutor class, use that class for parallel union_by_name #12957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 12, 2024

Conversation

Mytherin
Copy link
Collaborator

#9999 introduced parallel checkpointing by adding a separate mechanism for managing tasks during the checkpointing process. This was necessary because the checkpointing code cannot easily use the normal parallelism loops during execution - as the checkpointing can run outside of regular query execution (e.g. when shutting down a database, or during a commit).

This PR extracts the logic that was added in that PR into a separate class - the TaskExecutor. This class can be used to easily add parallelism in places where we cannot use the regular parallelism model. The TaskExecutor schedules tasks using the TaskScheduler, which are then executed in parallel using the regular worker threads. It merely provides a number of helper functions for keeping track of how many tasks have completed, and for error handling across different threads.

In this PR we use the TaskExecutor to provide a parallel implementation of the union_by_name file scanning. Since we perform auto-detection on all files, this is trivial to parallelize, and can provide substantial speedups when running read_csv or read_parquet with union_by_name enabled over many small files. The union_by_name is also a good showcase for how easy the parallelism is to add using the TaskExecutor, e.g.:

TaskExecutor executor(context);
// schedule tasks for all files
for (auto &file : files) {
	auto task = make_uniq<UnionByReaderTask>(....);
	executor.ScheduleTask(std::move(task));
}
// complete all tasks
executor.WorkOnTasks();

Benchmarks

Below are some timings of reading 1000 small CSV files. Source:

CREATE TABLE t1 AS select i,i,i,i,i,i,i,i,i,i from range(0,2048) tbl(i);
COPY t1 TO 'small_csv.csv' (FORMAT CSV, HEADER 0);
SELECT * from read_csv(repeat(['small_csv.csv'], 1000),delim= ',',  header = 0, union_by_name=true);
v1.0 main new
3.9s 3.2s 0.6s

@Mytherin Mytherin merged commit 9b35c08 into duckdb:main Jul 12, 2024
38 checks passed
github-actions bot pushed a commit to duckdb/duckdb-r that referenced this pull request Jul 17, 2024
Merge pull request duckdb/duckdb#12957 from Mytherin/parallelunionbyname
@Mytherin Mytherin deleted the parallelunionbyname branch August 4, 2024 08:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant