-
Notifications
You must be signed in to change notification settings - Fork 744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implementing Workstream::run() with Taskflow (no coloring) #17119
Conversation
Thank you, Ryan. FYI @bangerth (this is part 1 of several) |
f998fdb
to
05d5f85
Compare
The test mpi/mesh_worker_02 is failing. We will need to investigate. |
Okay, the old test made the assumption that cells are visited in order. Fixed. |
can you reindent, @RyanMoulday ? |
d05c8e0
to
ecc709c
Compare
We seem to be hitting some random test failures/timeouts. I think we need to investigate before we can merge this. |
this is waiting on #17131 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good start. I think it would be nice if we had a way to re-use copier objects in the same way as you are re-using scratch objects. What does the TBB implementation do in this regard?
Utilities::MPI::MPI_InitFinalize mpi_initialization( | ||
argc, argv, testing_max_num_threads()); | ||
MPILogInitAll log; | ||
// Disable multithreading so that text output order is consistent | ||
Utilities::MPI::MPI_InitFinalize mpi_initialization(argc, argv, 1); | ||
MPILogInitAll log; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this necessary? What is different between the TBB and the TaskFlow implementation that makes it necessary to make this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now we don't support chunking in the TaskFlow implementation. This particular test (and the others changed) relies on the output being generated in a specific order as it compares line by line. I believe this order is not explicitly assured by the TBB implementation either but just happens to be consistent because this gets chunked to run sequentially with the default grain size in the TBB implementation.
The outputs for the TaskFlow implementation are correct they just appear in a different order. I was advised that this would be the sensible change to make as right now I don't think this test actually even runs on more than 1 thread per MPI rank with the TBB implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that's true, but moreover I think that this essentially disables the test and that would be a shame. Can you show in which ways the output might differ depending on thread placement of tasks? Currently, the output of the whole test is this:
DEAL:0::DoFHandler ndofs=2
DEAL:0::*** 1. CELLS ***
DEAL:0::* own_cells=0 ghost_cells=0 own_faces=0 faces_to_ghost=0
DEAL:0::* own_cells=1 ghost_cells=0 own_faces=0 faces_to_ghost=0
DEAL:0::C 0_0:
DEAL:0::* own_cells=0 ghost_cells=1 own_faces=0 faces_to_ghost=0
DEAL:0::C 1_0:
DEAL:0::* own_cells=1 ghost_cells=1 own_faces=0 faces_to_ghost=0
DEAL:0::C 0_0:
DEAL:0::C 1_0:
DEAL:0::*** 2. FACES ***
DEAL:0::* own_cells=0 ghost_cells=0 own_faces=1 faces_to_ghost=0
DEAL:0::* own_cells=0 ghost_cells=0 own_faces=2 faces_to_ghost=0
DEAL:0::* own_cells=0 ghost_cells=0 own_faces=0 faces_to_ghost=1
DEAL:0::F cell1 = 0_0: face = 1 cell2 = 1_0: face2 = 0
DEAL:0::* own_cells=0 ghost_cells=0 own_faces=0 faces_to_ghost=2
DEAL:0::F cell1 = 0_0: face = 1 cell2 = 1_0: face2 = 0
Which lines are unreliably ordered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry to respond with the output of a different test but I just happen to have this one on hand.
Here is the expected output which TBB generates for Mesh_Worker_02:
DEAL:0::DoFHandler ndofs=7
DEAL:0::* own_cells=0 ghost_cells=0 own_faces=1 faces_to_ghost=0
DEAL:0::F cell1 = 0_1:1 face = 3 cell2 = 0_1:3 face2 = 2
DEAL:0::F cell1 = 0_1:2 face = 1 cell2 = 0_1:3 face2 = 0
DEAL:0::F cell1 = 0_2:00 face = 1 cell2 = 0_2:01 face2 = 0
DEAL:0::F cell1 = 0_2:00 face = 3 cell2 = 0_2:02 face2 = 2
DEAL:0::F cell1 = 0_2:01 face = 1 cell2 = 0_1:1 face2 = 0
DEAL:0::F cell1 = 0_2:01 face = 3 cell2 = 0_2:03 face2 = 2
DEAL:0::F cell1 = 0_2:02 face = 1 cell2 = 0_2:03 face2 = 0
DEAL:0::F cell1 = 0_2:02 face = 3 cell2 = 0_1:2 face2 = 2
DEAL:0::F cell1 = 0_2:03 face = 1 cell2 = 0_1:1 face2 = 0
DEAL:0::F cell1 = 0_2:03 face = 3 cell2 = 0_1:2 face2 = 2
DEAL:0::* own_cells=0 ghost_cells=0 own_faces=0 faces_to_ghost=1
DEAL:0::* own_cells=0 ghost_cells=0 own_faces=0 faces_to_ghost=2
And TaskFlow:
DEAL:0::DoFHandler ndofs=7
DEAL:0::* own_cells=0 ghost_cells=0 own_faces=1 faces_to_ghost=0
DEAL:0::F cell1 = 0_1:1 face = 3 cell2 = 0_1:3 face2 = 2
DEAL:0::F cell1 = 0_1:2 face = 1 cell2 = 0_1:3 face2 = 0
DEAL:0::F cell1 = 0_2:01 face = 1 cell2 = 0_1:1 face2 = 0
DEAL:0::F cell1 = 0_2:00 face = 1 cell2 = 0_2:01 face2 = 0
DEAL:0::F cell1 = 0_2:01 face = 3 cell2 = 0_2:03 face2 = 2
DEAL:0::F cell1 = 0_2:00 face = 3 cell2 = 0_2:02 face2 = 2
DEAL:0::F cell1 = 0_2:03 face = 1 cell2 = 0_1:1 face2 = 0
DEAL:0::F cell1 = 0_2:02 face = 1 cell2 = 0_2:03 face2 = 0
DEAL:0::F cell1 = 0_2:03 face = 3 cell2 = 0_1:2 face2 = 2
DEAL:0::F cell1 = 0_2:02 face = 3 cell2 = 0_1:2 face2 = 2
DEAL:0::* own_cells=0 ghost_cells=0 own_faces=0 faces_to_ghost=1
DEAL:0::* own_cells=0 ghost_cells=0 own_faces=0 faces_to_ghost=2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
basically, the test produce text output in the worker, that can be scheduled concurrently. The test used to work because the default chunk size with TBB is larger than the number of cells in this test. The proposed change make sure the test will run sequentially.
I think this is okay because the purpose of the test is not parallel computing but that the mesh worker visits the correct cells and faces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the change here is still awkward because bypass all of the interesting stuff if we have no threads, which is what happens here. On the other hand, we do test WorkStream
pretty heavily in many other tests, so that might not matter.
include/deal.II/base/work_stream.h
Outdated
# ifdef DEAL_II_WITH_TBB | ||
# ifdef DEAL_II_WITH_TASKFLOW | ||
if (static_cast<const std::function<void(const CopyData &)> &>(copier)) | ||
{ | ||
// If we have a copier, run the algorithm: | ||
internal::taskflow_no_coloring::run(begin, | ||
end, | ||
worker, | ||
copier, | ||
sample_scratch_data, | ||
sample_copy_data, | ||
queue_length, | ||
chunk_size); | ||
} | ||
else | ||
{ | ||
// There is no copier function. in this case, we have an | ||
// embarrassingly parallel problem where we can | ||
// essentially apply parallel_for. because parallel_for | ||
// requires subdividing the range for which operator- is | ||
// necessary between iterators, it is often inefficient to | ||
// apply it directly to cell ranges and similar iterator | ||
// types for which operator- is expensive or, in fact, | ||
// nonexistent. rather, in that case, we simply copy the | ||
// iterators into a large array and use operator- on | ||
// iterators to this array of iterators. | ||
// | ||
// instead of duplicating code, this is essentially the | ||
// same situation we have in the colored implementation below, so we | ||
// just defer to that place | ||
std::vector<std::vector<Iterator>> all_iterators(1); | ||
for (Iterator p = begin; p != end; ++p) | ||
all_iterators[0].push_back(p); | ||
|
||
run(all_iterators, | ||
worker, | ||
copier, | ||
sample_scratch_data, | ||
sample_copy_data, | ||
queue_length, | ||
chunk_size); | ||
} | ||
|
||
// exit this function to not run the sequential version below: | ||
return; | ||
# elif defined(DEAL_II_WITH_TBB) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only place that's different between the two implementation is the if
block. I think it might be nicer to write this as
#if defined(DEAL_II_WITH_TBB) || defined(DEAL_II_WITH_TASKFLOW)
if (have copier)
{
# if defined(DEAL_II_WITH_TBB)
new code
# elif defined(DEAL_II_WITH_TBB)
old code
# endif
}
else
{
... no change in this part, just forward to the other version of run() ...
}
#endif
include/deal.II/base/work_stream.h
Outdated
template <typename Worker, | ||
typename Copier, | ||
typename Iterator, | ||
typename ScratchData, | ||
typename CopyData> | ||
|
||
/** | ||
* The last two arguments in this function are for chunking support which | ||
* currently does not exist but ideally will later. For now they are | ||
* ignored but still here to permit existing programs to function | ||
*/ | ||
void | ||
run(const Iterator &begin, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put the comment above the declaration (which starts with template
). In general, start by saying what the function does, rather than with a comment about its arguments.
include/deal.II/base/work_stream.h
Outdated
*/ | ||
void | ||
run(const Iterator &begin, | ||
const typename identity<Iterator>::type &end, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use std_cxx20::type_identity_t<Iterator>
as elsewhere in this file.
include/deal.II/base/work_stream.h
Outdated
const unsigned int queue_length = 2 * MultithreadInfo::n_threads(), | ||
const unsigned int chunk_size = 8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll get warnings about unused arguments if you don't un-name these arguments:
const unsigned int queue_length = 2 * MultithreadInfo::n_threads(), | |
const unsigned int chunk_size = 8) | |
const unsigned int /*queue_length*/ = 2 * MultithreadInfo::n_threads(), | |
const unsigned int /*chunk_size*/ = 8) |
include/deal.II/base/work_stream.h
Outdated
// This is used to connect each worker to its copier as communication | ||
// between tasks is not supported. | ||
unsigned int idx = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, starting a sentence with "This" is awkward because it's unclear what "this" refers to. Did you mean "The following variable..."? If so, however, I don't understand what the comment is saying. How are workers using this variable to make this connection?
include/deal.II/base/work_stream.h
Outdated
std::vector<std::unique_ptr<CopyData>> copy_datas; | ||
|
||
for (Iterator i = begin; i != end; ++i, ++idx) | ||
{ | ||
copy_datas.emplace_back(); | ||
// Create a worker task. | ||
auto worker_task = | ||
taskflow | ||
.emplace([it = i, | ||
idx, | ||
&data, | ||
&sample_scratch_data, | ||
&sample_copy_data, | ||
©_datas, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not going to be thread-safe. You are taking a reference to copy_datas
, which you access via copy_datas[idx]
below from the worker thread, but at the same time the thread that spawns all of these tasks is modifying copy_datas
in line 745.
(Or perhaps what you are saying is that you are only creating the task objects here, not yet running them, and all of the accesses once the tasks are running only ever modify a single entry of the vector. That is ok, but it would be nice to describe this in a comment.)
include/deal.II/base/work_stream.h
Outdated
|
||
using ScratchDataList = std::list<ScratchDataObjects>; | ||
|
||
Threads::ThreadLocalStorage<ScratchDataList> data; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps give this a better name, like scratch_data_list
?
// Ensure that only one copy task can run at a time. | ||
if (!last_copier.empty()) | ||
last_copier.precede(copier_task); | ||
last_copier = copier_task; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the semantics of copying tasks? Does the copy point to the same task, or is it a separate object? This may be useful to document at this place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would still be good to address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RyanMoulday Can you add a comment above this assignment that says:
// Keep a handle to the last copier. Tasks in taskflow are basically handles to internally stored data, so this does not perform a copy:
I think this is a limitation of the current static tasking idea where the task graph is generated before we start running anything. Each copy task is a totally separate task that only knows it needs to run after its worker task (and after all previous copy tasks) but cannot receive any information from its worker task. As a result the copy task is usually handled by a thread that did not handle the worker task so a thread local copy object doesn't work and many more copy objects may exist than scratch objects. I imagine the correct way to get around this is the parallel pipeline implementation which is what TBB uses right now. TaskFlow also has its own parallel pipeline which could be used. The structures are pretty similar so this may be feasible with just slight edits to the TBB implementation if we want to go that route (this current code would basically be discarded). |
@RyanMoulday Can you please also rebase to the most current master? This should fix the nedelec tests. @bangerth There are some shortcomings in the current approach (no reuse of Copy objects, no chunking, storing a large vector of pointers, etc.). Nevertheless, Ryan measured same or better performance than TBB. I was encouraged by the results and thought it might be worthwhile to get these initial versions merged and later think about optimizing the implementation. Thoughts? |
ecc709c
to
79d0fc7
Compare
If there is a commitment to address some of these issues in future patches, I'm ok with an incremental approach. I do think that it would be interesting to try out the TaskFlow pipeline implementation. Having a non-pipeline implementation already merged might make for a nice baseline. |
Ryan, is this ready for a another round of reviews? |
Yes I have made the requested changes. The only hanging thing left is the tests and what we will do with them. |
I started looking into the tf::Pipeline and I have to admit that it is very hard to understand. We will give it a try, though. |
On 6/26/24 16:15, Timo Heister wrote:
I started looking into the tf::Pipeline and I have to admit that it is
very hard to understand. We will give it a try, though.
I see you already found my open issue about documentation. It's good to
know that it's not just me then...
|
include/deal.II/base/work_stream.h
Outdated
template <typename Iterator, typename ScratchData, typename CopyData> | ||
struct ScratchDataObject | ||
{ | ||
std::unique_ptr<ScratchData> scratch_data; | ||
bool currently_in_use; | ||
|
||
/** | ||
* Default constructor. | ||
*/ | ||
ScratchDataObject() | ||
: currently_in_use(false) | ||
{} | ||
|
||
ScratchDataObject(std::unique_ptr<ScratchData> &&p, const bool in_use) | ||
: scratch_data(std::move(p)) | ||
, currently_in_use(in_use) | ||
{} | ||
|
||
ScratchDataObject(ScratchData *p, const bool in_use) | ||
: scratch_data(p) | ||
, currently_in_use(in_use) | ||
{} | ||
|
||
// Provide a copy constructor that actually doesn't copy the | ||
// internal state. This makes handling ScratchAndCopyDataObjects | ||
// easier to handle with STL containers. | ||
ScratchDataObject(const ScratchDataObject &) | ||
: currently_in_use(false) | ||
{} | ||
|
||
ScratchDataObject(ScratchDataObject &&o) noexcept = default; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second reading, I don't see why this class has Iterator
and CopyData
as template arguments. They do not seem to be used.
include/deal.II/base/work_stream.h
Outdated
unsigned int idx = 0; | ||
|
||
std::vector<std::unique_ptr<CopyData>> copy_datas; | ||
|
||
// Generate a static task graph. Here we generate a task for each cell | ||
// that will be worked on. The tasks are not executed until all of them | ||
// are created, this code runs sequentially. | ||
for (Iterator i = begin; i != end; ++i, ++idx) | ||
{ | ||
copy_datas.emplace_back(); | ||
// Create a worker task. | ||
auto worker_task = | ||
taskflow | ||
.emplace([it = i, | ||
idx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not wrong, but it made me wonder whether there is a good reason to capture the iterator i
via a local renamed variable it
whereas you just capture idx
with its old name. Doing the same thing in different ways is a good way to confuse the reader, so my preference would be to use one style or the other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed. @RyanMoulday Can you please rename i
to it
in the for loop above and then just capture it
?
// Ensure that only one copy task can run at a time. | ||
if (!last_copier.empty()) | ||
last_copier.precede(copier_task); | ||
last_copier = copier_task; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would still be good to address.
Utilities::MPI::MPI_InitFinalize mpi_initialization( | ||
argc, argv, testing_max_num_threads()); | ||
MPILogInitAll log; | ||
// Disable multithreading so that text output order is consistent | ||
Utilities::MPI::MPI_InitFinalize mpi_initialization(argc, argv, 1); | ||
MPILogInitAll log; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the change here is still awkward because bypass all of the interesting stuff if we have no threads, which is what happens here. On the other hand, we do test WorkStream
pretty heavily in many other tests, so that might not matter.
You should tell the original author about the bad test design (printing from a worker task! edit: this was me, btw :-) ). But I think this is acceptable, as the test did not run in parallel before either and it is not made to test that workstream runs in parallel. |
I've gone ahead and made the most recent requested changes |
This grid/intergrid_constraints.debug timeout seems to be a recurring issue. I am unable to recreate it when running the test suite on my own device so investigating the cause is difficult. The test seems to be for compute_intergrid_weights and within that function the helper function compute_intergrid_weights_2 makes a workstream call so perhaps the issue is here somewhere. |
It does not fail on master, so I think this is related to your changes here. The test might already be slow to begin with, but we will need to check. |
Its definitely related to these changes but I was trying to figure out where this test actually invokes the new code and that was the main place that I could easily see it happening. The test is not remarkably slow when it passes, on the runs that it did pass it took 15-30 seconds. The timeout is 1200 seconds so there is definitely something weird going on. |
Agreed. When it passes on your branch it only takes a few seconds: https://ci.tjhei.info/job/dealii-serial/job/PR-17119/13/testReport/projectroot.tests.grid.intergrid_constraintsdebug/ Do we have a deadlock in this test? Does it pass every time you run it on your computer? (you can |
Exactly, except if there is a bug within taskflow. |
Ryan and I figured out that the hang disappears if we disable the taskflow async() calls that we use for background tasks. This seems like we might be hitting a bug in the taskflow scheduler. We could move forward with this PR if we disable the async() functionality. Otherwise, we would need to a) make a small program that can reproduce the hang, b) get it fixed in taskflow (assuming it is indeed a bug), c) wait for the next taskflow release, d) then merge this PR. |
@bangerth Any thoughts on the current situation? How sure are you that dealii/include/deal.II/base/thread_management.h Lines 960 to 965 in d25bc7b
is correct and can not cause an issue? |
I looked at the hang one more time and I observe the following behavior:
thread 2-20 belong to TBB and are idle thread 21 seems to be an idle taskflow worker:
Thread 22/23 are taskflow workers stuck waiting for a mutex, both are tasks in the workstream:
I don't understand why the thread that holds the lock is not making any progress. Does that mean taskflow interrupted the task while it is holding the lock? |
On 7/5/24 03:41, Timo Heister wrote:
@bangerth <https://github.com/bangerth> Any thoughts on the current situation?
How sure are you that
https://github.com/dealii/dealii/blob/d25bc7b6a69a8d4204cab10b88acbeded04ee040/include/deal.II/base/thread_management.h#L960-L965 <https://github.com/dealii/dealii/blob/d25bc7b6a69a8d4204cab10b88acbeded04ee040/include/deal.II/base/thread_management.h#L960-L965>
is correct and can not cause an issue?
Reasonably sure -- the best I can say about TaskFlow. I've been running with
TaskFlow tasks for weeks locally and have not had problems with it. My best
guess is that the problem is with this patch here.
|
This is confusing me as well. If you have two threads that are stuck waiting for a mutex, who is currently holding the mutex? It must be one of the other threads, but it's not clear to from what you show which one that would be. In any of the other threads, is one of the higher frames also in (Separately, I think you can configure with TaskFlow and without TBB. In that case, you don't have to deal with all of the TBB worker threads.) |
In fact, it's enough if any of the threads in a higher frame is somewhere in the middle of |
Yes, you are indeed correct. I didn't read far enough down. The workstream function indirectly calls get_prolongation_matrix(), which waits for tasks to complete, which taskflow uses to corun another task of workstream that hangs indefinitely:
|
Such is the peril with tasks: You can't spawn new tasks in a region you protect with a mutex. So the bug is here:
|
Ugh, what a rats' nest :-( |
@RyanMoulday can you please rebase to the current master branch? |
086657c
to
e7144c0
Compare
e7144c0
to
e14716c
Compare
This might be good to go now that we have a workaround for the deadlock. @RyanMoulday is there anything else missing, you think? |
The only thing left I can think of to do is setting up a similar guard within the workstream implementation to the one that exists in the tasking module where we corun if we are already within a taskflow task. I think it's theoretically possible if a workstream were to generate another workstream directly that since the calling thread doesn't participate we could lock up. Not sure if having a workstream directly within a workstream is something we ever do though. |
Let's get this merged. As mentioned above, I think that there are a number of things we should improve (chunking, re-use of objects), which I hope will get addressed in follow-up PRs. But we have a bit of time to the next release. |
@RyanMoulday Would you mind opening an issue that lists the things we still want to improve? |
No description provided.