Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-31769: [C++][Acero] Add spilling for hash join #13669

Closed
wants to merge 8 commits into from

Conversation

save-buffer
Copy link
Contributor

@save-buffer save-buffer commented Jul 21, 2022

Adds support for spilling data to disk during hash join.

@save-buffer save-buffer marked this pull request as draft July 21, 2022 05:03
@github-actions
Copy link

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exciting to see this starting to come together. Not looking through in detail yet but picking at some of the points I suspect are going to be more contentious so we can start the conversation.

cpp/src/arrow/memory_pool_internal.h Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/spilling_util.cc Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/spilling_util.cc Outdated Show resolved Hide resolved
@save-buffer save-buffer force-pushed the sasha_spilling2 branch 3 times, most recently from fad418d to a1b3b13 Compare July 26, 2022 02:50
@save-buffer save-buffer force-pushed the sasha_spilling2 branch 2 times, most recently from 62fa41a to d858c9f Compare August 3, 2022 22:21
@save-buffer save-buffer force-pushed the sasha_spilling2 branch 6 times, most recently from fcb3bf2 to 8d527a3 Compare August 11, 2022 23:22
Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting to poke around the edges of this PR. Can you explain to me the relationship between SpillingJoin and HashJoinNode?

cpp/src/arrow/compute/light_array.h Show resolved Hide resolved
cpp/src/arrow/util/io_util.h Outdated Show resolved Hide resolved
cpp/src/arrow/memory_pool.h Outdated Show resolved Hide resolved
cpp/src/arrow/compute/light_array.h Show resolved Hide resolved
cpp/src/arrow/compute/exec/accumulation_queue.cc Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/spilling_util.h Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/spilling_util.h Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/spilling_util.h Outdated Show resolved Hide resolved
cpp/src/arrow/util/atomic_util.h Outdated Show resolved Hide resolved
Comment on lines 33 to 69
using OutputBatchCallback = std::function<void(int64_t, ExecBatch)>;
using BuildFinishedCallback = std::function<Status(size_t)>;
using FinishedCallback = std::function<void(int64_t)>;
using RegisterTaskGroupCallback = std::function<int(
std::function<Status(size_t, int64_t)>, std::function<Status(size_t)>)>;
using StartTaskGroupCallback = std::function<Status(int, int64_t)>;
using PauseProbeSideCallback = std::function<void(int)>;
using ResumeProbeSideCallback = std::function<void(int)>;
using AbortContinuationImpl = std::function<void()>;

struct CallbackRecord
{
OutputBatchCallback output_batch_callback;
BuildFinishedCallback build_finished_callback;
FinishedCallback finished_callback;
RegisterTaskGroupCallback register_task_group_;
StartTaskGroupCallback start_task_group_callback;
PauseProbeSideCallback pause_probe_side_callback;
AbortContinuationImpl abort_callback;
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just use a pure virtual class at this point?

class HashJoinExternals {
  virtual void OutputBatch(int64_t, ExecBatch) = 0;
  // ...
};

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally find these pure-virtual classes cumbersome to deal with as they remove the callback record from being near the site where I invoke Init. They're also less flexible and don't let me reuse functions (like HashJoinImpl and SpillingHashJoin reuse a lot of the same callbacks, I can just assign the same stuff between the two callback records).

@westonpace westonpace self-requested a review August 11, 2022 23:33
@westonpace
Copy link
Member

@marsupialtail do you mind taking a look at spilling_file (and other parts of the PR if interested). Curious to get your feedback since you experimented with direct I/O as well.

}

if(pwritev(handle, ios.data(), static_cast<int>(ios.size()), info.start) == -1)
return Status::IOError("Failed to spill!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seem to recall a discussion here where we talked about the performance of using pwritev versus things like IO uring where you were able to saturate NVME SSD bandwidth. Were you able to saturate SSD with pwritev? I understand that when you are spilling many batches there might be many pwritevs happening at the same time. Still I am curious how the perf compares to IO uring -- this is to satisfy my (and maybe other people's) curiosity not to point out a problem with your code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are conflating two things: The IO command (pwritev) and the interface used to invoke it (syscall vs io_uring). io_uring lets you kick off a pwritev by writing into a ring buffer and invoking a memory barrier and allowing it to be executed on a kernel-mode thread. pwritev is a normal syscall that is synchronous, but I'm invoking it on a different user space thread in order to emulate asynchrony, so the net effect should be the same (but more cumbersome to write the code). I am using pwritev in both scenarios, just invoking it in two different ways.

That said, I will add a benchmark.

#ifdef __ANDROID__
const char *backup = "/data/local/tmp/";
#else
const char *backup = "/tmp/";
Copy link
Contributor

@marsupialtail marsupialtail Aug 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if I want to spill to an attached NVME SSD that is mounted on its own directory? E.g. on AWS instances with NVME SSD you usually mount it to a directory called /data or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now you can set one of the below environment variables, but eventually we'll flesh out QueryOptions to allow you to specify more options such as the temp directory.

const char *selectors[] = { "TMPDIR", "TMP", "TEMP", "TEMPDIR" };

@save-buffer save-buffer force-pushed the sasha_spilling2 branch 10 times, most recently from 8f07030 to 8f97bb2 Compare August 18, 2022 22:09
@save-buffer save-buffer force-pushed the sasha_spilling2 branch 3 times, most recently from 96c2370 to 8dcf8c7 Compare September 21, 2022 23:59
@save-buffer save-buffer force-pushed the sasha_spilling2 branch 3 times, most recently from fd7f00e to 47a1fca Compare September 24, 2022 02:27
@save-buffer save-buffer marked this pull request as ready for review January 6, 2023 21:20
Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution. I managed to get a pretty thorough look today so I hope future reviews can be faster (try not to squash changes past this point so it's easier for me to see what you've changed). I've got some suggestions.

Also, at the moment, when I run locally, both the benchmark and the unit tests hang forever. I am attempting to debug further but haven't look into it too much.

cpp/src/arrow/compute/exec/accumulation_queue.cc Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/accumulation_queue.cc Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/accumulation_queue.h Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/accumulation_queue.cc Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/accumulation_queue.cc Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/spilling_benchmark.cc Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/spilling_benchmark.cc Outdated Show resolved Hide resolved
cpp/src/arrow/compute/exec/spilling_test.cc Outdated Show resolved Hide resolved
Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We spoke about this briefly offline, but I'll summarize here as well. These changes are roughly what we want. We need to figure out the CI failures at this point and refine the benchmarks & testing so they don't take so long in certain environments. Then we can go through, straighten out any last rough edges, and get this merged in.

@westonpace westonpace changed the title ARROW-16389: [C++][Acero] Add spilling for hash join GH-31769: [C++][Acero] Add spilling for hash join Jan 20, 2023
@github-actions
Copy link

@github-actions
Copy link

⚠️ GitHub issue #31769 has been automatically assigned in GitHub to PR creator.

@amol-
Copy link
Member

amol- commented Mar 30, 2023

Closing because it has been untouched for a while, in case it's still relevant feel free to reopen and move it forward 👍

@vkhodygo
Copy link

vkhodygo commented Aug 2, 2023

@save-buffer Any news regarding this one?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[C++] Support hash-join on larger than memory datasets
6 participants