Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-15265: [C++] Fix hang in dataset writer with kDeleteMatchingPartitions and #partitions >= 8 #12099

Closed
wants to merge 10 commits into from

Conversation

lidavidm
Copy link
Member

@lidavidm lidavidm commented Jan 7, 2022

When the dataset writer is configured to delete existing data before writing, the target directory is on S3, the dataset is partitioned, and there are at least as many partitions as threads in the I/O thread pool, then the writer would hang. The writer spawns a task on the I/O thread pool for each partition to delete existing data. However, S3FS implemented the relevant filesystem call by asynchronously listing the objects using the I/O thread pool, then deleting them, blocking until this is done. Hence, nested asynchrony would cause the program to hang.

The fix is to do this deletion fully asynchronously, so that there is no blocking. It's sufficient to just use the default implementation of async filesystem methods; it just spawns another task on the I/O thread pool, but this lets the writer avoid blocking. However, this PR also refactors the S3FS internals to implement the call truly asynchronously.

This PR also implements FileInterface::CloseAsync. This is required because by default on S3, files do writes asynchronously in the background, and Close() just blocks until those complete. This also consumes the I/O thread pool (both the blocking and the background writes), so we need an async version of this to avoid the deadlock.

@github-actions
Copy link

github-actions bot commented Jan 7, 2022

@github-actions
Copy link

github-actions bot commented Jan 7, 2022

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

@lidavidm lidavidm marked this pull request as draft January 7, 2022 18:44
@lidavidm
Copy link
Member Author

lidavidm commented Jan 7, 2022

Hmm, this sometimes hangs on >8 partitions, taking another look.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, looks good. Just one small change in how you are invoking it at the top level.

init_future_ =
DeferNotOk(write_options_.filesystem->io_context().executor()->Submit([this] {
init_future_ = DeferNotOk(
write_options_.filesystem->io_context().executor()->Submit([this]() -> Future<> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would probably be a good addition to allow executor submission tasks to return futures but I don't think this actually works today. The future you are returning from the task will be discarded without being awaited.

I think you'll need to change it to...

init_future_ = SubmitTask(CreateDir).Then(DeleteDirContentsAsync);

@lidavidm
Copy link
Member Author

lidavidm commented Jan 7, 2022

Ah, there's another deadlock: finishing a FileWriter closes the underlying file. This is done as a continuation that runs on the I/O thread pool (I think). (On a side note, would anyone complain if I #ifdef'd in the pthread calls to name threads on Linux to make debugging easier?) On S3, closing a file blocks on a condition variable until all background writes are finished…and of course, those background writes are on the I/O thread pool, like everything else, so we hang.

@lidavidm
Copy link
Member Author

lidavidm commented Jan 7, 2022

And there's definitely a race condition somewhere… (using the reproducer from JIRA)

8 partitions
Traceback (most recent call last):
  File "/home/lidavidm/Code/upstream/arrow-15265/python/test.py", line 37, in <module>
    ds.write_dataset(
  File "/home/lidavidm/Code/upstream/arrow-15265/python/pyarrow/dataset.py", line 931, in write_dataset
    _filesystemdataset_write(
  File "pyarrow/_dataset.pyx", line 2658, in pyarrow._dataset._filesystemdataset_write
    check_status(CFileSystemDataset.Write(c_options, c_scanner))
  File "pyarrow/error.pxi", line 114, in pyarrow.lib.check_status
    raise IOError(message)
OSError: Path does not exist 'my-bucket/test8.parquet/col1=b'

@westonpace
Copy link
Member

I think finishing a file actually happens on the CPU thread pool at the moment. Although it's at the mercy of the writer.

@westonpace
Copy link
Member

On a side note, would anyone complain if I #ifdef'd in the pthread calls to name threads on Linux to make debugging easier?

Please do.

@westonpace
Copy link
Member

I think finishing a file actually happens on the CPU thread pool at the moment. Although it's at the mercy of the writer.

Ah, but the background Close/Wait also blocks on the CPU thread pool. So I think you're filling up the CPU thread pool in this case.

@westonpace
Copy link
Member

Moving the Close/Wait to the I/O thread pool will probably be an easy fix. Then the rules we are building are...

  • If you are going to call a synchronous filesystem method (e.g. Close, DeleteDirContents) then you must be on the CPU thread pool.
  • If you are a synchronous filesystem method then you are only allowed to spawn one I/O task and wait for it.

...I think that will prevent any sort of nested deadlock.

@lidavidm
Copy link
Member Author

lidavidm commented Jan 7, 2022

Surprisingly it is the I/O thread pool:

(gdb) info thread
  Id   Target Id         Frame 
* 1    Thread 0x7f8856a04740 (LWP 27248) "python" 0x00007f88565f5ad3 in futex_wait_cancelable (private=<optimized out>, 
    expected=0, futex_word=0x5622ee0e97e8) at ../sysdeps/unix/sysv/linux/futex-internal.h:88
...snip...
  29   Thread 0x7f879e7fc700 (LWP 27276) "Io-7" 0x00007f88565f5ad3 in futex_wait_cancelable (private=<optimized out>, 
    expected=0, futex_word=0x7f87900051e0) at ../sysdeps/unix/sysv/linux/futex-internal.h:88
  30   Thread 0x7f879d3ff700 (LWP 27277) "jemalloc_bg_thd" 0x00007f88565f5ad3 in futex_wait_cancelable (
    private=<optimized out>, expected=0, futex_word=0x7f87cd40a794) at ../sysdeps/unix/sysv/linux/futex-internal.h:88
  31   Thread 0x7f878ffff700 (LWP 27278) "jemalloc_bg_thd" 0x00007f88565f5ad3 in futex_wait_cancelable (
    private=<optimized out>, expected=0, futex_word=0x7f87cd40a864) at ../sysdeps/unix/sysv/linux/futex-internal.h:88
(gdb) t 29
[Switching to thread 29 (Thread 0x7f879e7fc700 (LWP 27276))]
#0  0x00007f88565f5ad3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x7f87900051e0)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
88	in ../sysdeps/unix/sysv/linux/futex-internal.h
(gdb) bt
(gdb) bt
#0  0x00007f88565f5ad3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x7f87900051e0)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
#1  __pthread_cond_wait_common (abstime=0x0, mutex=0x7f8790005190, cond=0x7f87900051b8) at pthread_cond_wait.c:502
#2  __pthread_cond_wait (cond=0x7f87900051b8, mutex=0x7f8790005190) at pthread_cond_wait.c:655
#3  0x00007f87cdd374ad in __gthread_cond_wait (
    __mutex=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, 
    __cond=<optimized out>)
    at /home/conda/feedstock_root/build_artifacts/gcc_compilers_1628138005912/work/build/x86_64-conda-linux-gnu/libstdc++-v3/src/c++11/condition_variable.cc:865
#4  std::__condvar::wait (__m=<error reading variable: dwarf2_find_location_expression: Corrupted DWARF expression.>, 
    this=<optimized out>) at ../../../../../libstdc++-v3/src/c++11/gthr-default.h:155
#5  std::condition_variable::wait (this=<optimized out>, __lock=...)
    at ../../../../../libstdc++-v3/src/c++11/condition_variable.cc:41
#6  0x00007f87d1420c11 in std::condition_variable::wait<arrow::fs::(anonymous namespace)::ObjectOutputStream::Flush()::{lambda()#1}>(std::unique_lock<std::mutex>&, arrow::fs::(anonymous namespace)::ObjectOutputStream::Flush()::{lambda()#1}) (
    this=0x7f87900051b8, __lock=..., __p=...)
    at /usr/lib/gcc/x86_64-linux-gnu/7.5.0/../../../../include/c++/7.5.0/condition_variable:99
#7  0x00007f87d141ba1e in arrow::fs::(anonymous namespace)::ObjectOutputStream::Flush (this=0x7f879000add0)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/filesystem/s3fs.cc:1301
#8  0x00007f87d141bfe0 in arrow::fs::(anonymous namespace)::ObjectOutputStream::Close (this=0x7f879000add0)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/filesystem/s3fs.cc:1218
#9  0x00007f87d141c66d in virtual thunk to arrow::fs::(anonymous namespace)::ObjectOutputStream::Close() ()
    at /usr/lib/gcc/x86_64-linux-gnu/7.5.0/../../../../include/c++/7.5.0/bits/hashtable.h:492
#10 0x00007f87c7d7d907 in arrow::dataset::FileWriter::Finish (this=0x7f8788007d80)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/dataset/file_base.cc:322
#11 0x00007f87c7d59040 in arrow::dataset::internal::(anonymous namespace)::DatasetWriterFileQueue::DoFinish (
    this=0x7f87b000b5f0) at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/dataset/dataset_writer.cc:221
#12 0x00007f87c7d58ee6 in arrow::dataset::internal::(anonymous namespace)::DatasetWriterFileQueue::DoDestroy()::{lambda()#1}::operator()() const (this=0x7f87b00088c8)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/dataset/dataset_writer.cc:196
#13 0x00007f87c7d58e80 in arrow::detail::ContinueFuture::operator()<arrow::dataset::internal::(anonymous namespace)::DatasetWriterFileQueue::DoDestroy()::{lambda()#1}, , arrow::Status, arrow::Future<arrow::internal::Empty> > (
    this=0x7f879e7fb268, next=..., f=...) at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.h:148
#14 0x00007f87c7d58d42 in arrow::detail::ContinueFuture::IgnoringArgsIf<arrow::dataset::internal::(anonymous namespace)::DatasetWriterFileQueue::DoDestroy()::{lambda()#1}, arrow::Future<arrow::internal::Empty>, arrow::internal const&>(std::integral_constant<bool, true>, arrow::Future<arrow::internal::Empty>&&, arrow::dataset::internal::(anonymous namespace)::DatasetWriterFileQueue::DoDestroy()::{lambda()#1}&&, arrow::internal const&) const (this=0x7f879e7fb268, next=..., f=...)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.h:186
#15 0x00007f87c7d58c3d in arrow::Future<arrow::internal::Empty>::ThenOnComplete<arrow::dataset::internal::(anonymous namespace)::DatasetWriterFileQueue::DoDestroy()::{lambda()#1}, arrow::Future<arrow::internal::Empty>::PassthruOnFailure<{lambda()#1}> >::operator()(arrow::Result<arrow::internal::Empty> const&) && (this=0x7f87b00088c8, result=...)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.h:599
#16 0x00007f87c7d58bb2 in arrow::Future<arrow::internal::Empty>::WrapResultyOnComplete::Callback<arrow::Future<arrow::internal::Empty>::ThenOnComplete<arrow::dataset::internal::(anonymous namespace)::DatasetWriterFileQueue::DoDestroy()::{lambda()#1}, arrow::Future<arrow::internal::Empty>::PassthruOnFailure<{lambda()#1}> > >::operator()(arrow::FutureImpl const&) && (this=0x7f87b00088c8, impl=...) at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.h:496
#17 0x00007f87c7d58b76 in arrow::internal::FnOnce<void (arrow::FutureImpl const&)>::FnImpl<arrow::Future<arrow::internal::Empty>::WrapResultyOnComplete::Callback<arrow::Future<arrow::internal::Empty>::ThenOnComplete<arrow::dataset::internal::(anonymous namespace)::DatasetWriterFileQueue::DoDestroy()::{lambda()#1}, arrow::Future<arrow::internal::Empty>::PassthruOnFailure<{lambda()#1}> > > >::invoke(arrow::FutureImpl const&) (this=0x7f87b00088c0, a=...)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/functional.h:152
#18 0x00007f87d023220f in arrow::internal::FnOnce<void (arrow::FutureImpl const&)>::operator()(arrow::FutureImpl const&) && (this=0x7f87b00065e0, a=...) at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/functional.h:140
#19 0x00007f87d0231b61 in arrow::ConcreteFutureImpl::RunOrScheduleCallback (self=..., callback_record=..., 
    in_add_callback=false) at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.cc:298
#20 0x00007f87d023160f in arrow::ConcreteFutureImpl::DoMarkFinishedOrFailed (this=0x7f87b000ba60, 
    state=arrow::FutureState::SUCCESS) at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.cc:327
#21 0x00007f87d022f9ca in arrow::ConcreteFutureImpl::DoMarkFinished (this=0x7f87b000ba60)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.cc:231
#22 0x00007f87d022cc9d in arrow::FutureImpl::MarkFinished (this=0x7f87b000ba60)
---Type <return> to continue, or q <return> to quit---
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.cc:383
#23 0x00007f87d01ce812 in arrow::Future<arrow::internal::Empty>::DoMarkFinished (this=0x7f87b000b688, res=...)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.h:712
#24 0x00007f87d01ccfa4 in arrow::Future<arrow::internal::Empty>::MarkFinished<arrow::internal::Empty, void> (
    this=0x7f87b000b688, s=...) at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.h:463
#25 0x00007f87d01cb4ed in arrow::util::SerializedAsyncTaskGroup::ConsumeAsMuchAsPossibleUnlocked (this=0x7f87b000b688, 
    guard=...) at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/async_util.cc:143
#26 0x00007f87d01cc9c0 in arrow::util::SerializedAsyncTaskGroup::TryDrainUnlocked()::$_2::operator()() const::{lambda(arrow::Status const&)#1}::operator()(arrow::Status const&) const (this=0x7f8788009c38, st=...)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/async_util.cc:164
#27 0x00007f87d01cc92a in arrow::Future<arrow::internal::Empty>::WrapStatusyOnComplete::Callback<arrow::util::SerializedAsyncTaskGroup::TryDrainUnlocked()::$_2::operator()() const::{lambda(arrow::Status const&)#1}>::operator()(arrow::FutureImpl const&) && (this=0x7f8788009c38, impl=...) at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.h:509
#28 0x00007f87d01cc8e6 in arrow::internal::FnOnce<void (arrow::FutureImpl const&)>::FnImpl<arrow::Future<arrow::internal::Empty>::WrapStatusyOnComplete::Callback<arrow::util::SerializedAsyncTaskGroup::TryDrainUnlocked()::$_2::operator()() const::{lambda(arrow::Status const&)#1}> >::invoke(arrow::FutureImpl const&) (this=0x7f8788009c30, a=...)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/functional.h:152
#29 0x00007f87d023220f in arrow::internal::FnOnce<void (arrow::FutureImpl const&)>::operator()(arrow::FutureImpl const&) && (this=0x7f8788002610, a=...) at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/functional.h:140
#30 0x00007f87d0231b61 in arrow::ConcreteFutureImpl::RunOrScheduleCallback (self=..., callback_record=..., 
    in_add_callback=false) at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.cc:298
#31 0x00007f87d023160f in arrow::ConcreteFutureImpl::DoMarkFinishedOrFailed (this=0x7f8794019890, 
    state=arrow::FutureState::SUCCESS) at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.cc:327
#32 0x00007f87d022f9ca in arrow::ConcreteFutureImpl::DoMarkFinished (this=0x7f8794019890)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.cc:231
#33 0x00007f87d022cc9d in arrow::FutureImpl::MarkFinished (this=0x7f8794019890)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.cc:383
#34 0x00007f87d01ce812 in arrow::Future<arrow::internal::Empty>::DoMarkFinished (this=0x7f879e7fbac0, res=...)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.h:712
#35 0x00007f87d01ccfa4 in arrow::Future<arrow::internal::Empty>::MarkFinished<arrow::internal::Empty, void> (
    this=0x7f879e7fbac0, s=...) at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.h:463
#36 0x00007f87c7d5823d in _ZNK5arrow6detail14ContinueFutureclIRZNS_7dataset8internal12_GLOBAL__N_122DatasetWriterFileQueue9WriteNextESt10shared_ptrINS_11RecordBatchEEE9WriteTaskJENS_6StatusENS_6FutureINS_8internal5EmptyEEEEENSt9enable_ifIXaaaantsr3std7is_voidIT1_EE5valuentsr9is_futureISI_EE5valueoontsrT2_8is_emptysr3std7is_sameISI_SC_EE5valueEvE4typeESJ_OT_DpOT0_
    (this=0x7f8788003fe8, next=..., f=...) at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/future.h:148
#37 0x00007f87c7d581b3 in std::__invoke_impl<void, arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, arrow::dataset::internal::(anonymous namespace)::DatasetWriterFileQueue::WriteNext(std::shared_ptr<arrow::RecordBatch>)::WriteTask&>(std::__invoke_other, arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, arrow::dataset::internal::(anonymous namespace)::DatasetWriterFileQueue::WriteNext(std::shared_ptr<arrow::RecordBatch>)::WriteTask&) (
    __f=..., __args=..., __args=...)
    at /usr/lib/gcc/x86_64-linux-gnu/7.5.0/../../../../include/c++/7.5.0/bits/invoke.h:60
#38 0x00007f87c7d580e7 in std::__invoke<arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, arrow::dataset::internal::(anonymous namespace)::DatasetWriterFileQueue::WriteNext(std::shared_ptr<arrow::RecordBatch>)::WriteTask&>(arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, arrow::dataset::internal::(anonymous namespace)::DatasetWriterFileQueue::WriteNext(std::shared_ptr<arrow::RecordBatch>)::WriteTask&) (__fn=..., __args=..., __args=...)
    at /usr/lib/gcc/x86_64-linux-gnu/7.5.0/../../../../include/c++/7.5.0/bits/invoke.h:95
#39 0x00007f87c7d58092 in std::_Bind<arrow::detail::ContinueFuture (arrow::Future<arrow::internal::Empty>, arrow::dataset::internal::(anonymous namespace)::DatasetWriterFileQueue::WriteNext(std::shared_ptr<arrow::RecordBatch>)::WriteTask)>::__call<void, , 0ul, 1ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul>) (this=0x7f8788003fe8, __args=...)
    at /usr/lib/gcc/x86_64-linux-gnu/7.5.0/../../../../include/c++/7.5.0/functional:467
#40 0x00007f87c7d58026 in std::_Bind<arrow::detail::ContinueFuture (arrow::Future<arrow::internal::Empty>, arrow::dataset::internal::(anonymous namespace)::DatasetWriterFileQueue::WriteNext(std::shared_ptr<arrow::RecordBatch>)::WriteTask)>::operator()<, void>() (this=0x7f8788003fe8)
    at /usr/lib/gcc/x86_64-linux-gnu/7.5.0/../../../../include/c++/7.5.0/functional:549
#41 0x00007f87c7d57ff1 in arrow::internal::FnOnce<void ()>::FnImpl<std::_Bind<arrow::detail::ContinueFuture (arrow::Future<arrow::internal::Empty>, arrow::dataset::internal::(anonymous namespace)::DatasetWriterFileQueue::WriteNext(std::shared_ptr<arrow::RecordBatch>)::WriteTask)> >::invoke() (this=0x7f8788003fe0)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/functional.h:152
#42 0x00007f87d027e85a in arrow::internal::FnOnce<void ()>::operator()() && (this=0x7f879e7fbd10)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/functional.h:140
---Type <return> to continue, or q <return> to quit---
#43 0x00007f87d027e03f in arrow::internal::WorkerLoop (state=..., it=...)
    at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/thread_pool.cc:185
#44 0x00007f87d027dce3 in arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_3::operator()() const (
    this=0x7f87a0011578) at /home/lidavidm/Code/upstream/arrow-15265/cpp/src/arrow/util/thread_pool.cc:363
#45 0x00007f87d027dc6d in _ZSt13__invoke_implIvZN5arrow8internal10ThreadPool21LaunchWorkersUnlockedEiE3$_3JEET_St14__invoke_otherOT0_DpOT1_ (__f=...) at /usr/lib/gcc/x86_64-linux-gnu/7.5.0/../../../../include/c++/7.5.0/bits/invoke.h:60
#46 0x00007f87d027dbfd in _ZSt8__invokeIZN5arrow8internal10ThreadPool21LaunchWorkersUnlockedEiE3$_3JEENSt15__invoke_resultIT_JDpT0_EE4typeEOS5_DpOS6_ (__fn=...)
    at /usr/lib/gcc/x86_64-linux-gnu/7.5.0/../../../../include/c++/7.5.0/bits/invoke.h:95
#47 0x00007f87d027dbd5 in std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_3> >::_M_invoke<0ul> (this=0x7f87a0011578) at /usr/lib/gcc/x86_64-linux-gnu/7.5.0/../../../../include/c++/7.5.0/thread:234
#48 0x00007f87d027dba5 in std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_3> >::operator()() (this=0x7f87a0011578) at /usr/lib/gcc/x86_64-linux-gnu/7.5.0/../../../../include/c++/7.5.0/thread:243
#49 0x00007f87d027da39 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_3> > >::_M_run() (this=0x7f87a0011570)
    at /usr/lib/gcc/x86_64-linux-gnu/7.5.0/../../../../include/c++/7.5.0/thread:186
#50 0x00007f87cdd3b9b0 in std::execute_native_thread_routine (__p=<optimized out>)
    at /home/conda/feedstock_root/build_artifacts/gcc_compilers_1628138005912/work/build/x86_64-conda-linux-gnu/libstdc++-v3/include/bits/new_allocator.h:82
#51 0x00007f88565ef6db in start_thread (arg=0x7f879e7fc700) at pthread_create.c:463
#52 0x00007f885596b71f in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

@lidavidm
Copy link
Member Author

lidavidm commented Jan 7, 2022

Thanks for the help Weston, this last commit should fix the deadlock…though I still occasionally see that OSError, but I think we can try to track that down separately.

@lidavidm
Copy link
Member Author

lidavidm commented Jan 7, 2022

  1. Filed ARROW-15285 for the OSError, though, it seems quite rare (2/200 runs)
  2. Increasing partitions to 16 causes it to hang again…taking a look…

@lidavidm
Copy link
Member Author

lidavidm commented Jan 7, 2022

It's the same hang with 16 partitions so I think we will need a CloseAsync().

@lidavidm
Copy link
Member Author

lidavidm commented Jan 7, 2022

Ah, the fundamental issue is S3FS implements writes asynchronously (unless background_writes=False), but our file interfaces are still mostly synchronous, and the dataset writer is asynchronous, using the thread pool to manage parallelism…so we have nested parallelism. Setting background_writes=False fixes it by breaking this loop, the other way will be to have at least CloseAsync().

@lidavidm lidavidm marked this pull request as ready for review January 10, 2022 18:50
@lidavidm
Copy link
Member Author

Alright, this now also adds CloseAsync() and updates some APIs and tests to match.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know how tricky it is tracking down these race conditions and finding these bugs so a huge thank you for figuring this all out. I have a few observations but overall I think everything checks out.

Comment on lines 337 to 296
Future<> CsvFileWriter::FinishInternal() {
return DeferNotOk(destination_locator_.filesystem->io_context().executor()->Submit(
[this]() { return batch_writer_->Close(); }));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CSV writer's Close method is a no-op so I think it would be safe to just return a finished future here.

Suggested change
Future<> CsvFileWriter::FinishInternal() {
return DeferNotOk(destination_locator_.filesystem->io_context().executor()->Submit(
[this]() { return batch_writer_->Close(); }));
}
Future<> CsvFileWriter::FinishInternal() {
batch_writer_->Close();
return Future<>::MakeFinished();
}

@@ -1243,6 +1243,48 @@ class ObjectOutputStream final : public io::OutputStream {
return Status::OK();
}

Future<> CloseAsync() override {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of overlap between this method and Close. Can we create some helper methods so we have something like (pseudo-code)...

Future<> CloseAsync() override {
  PrepareClose();
  FlushAsync().Then(DoClose);
}

Status Close() override {
  PrepareClose();
  Flush();
  DoClose();
}

Or you could just change Close to be return CloseAsync().status()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah whoops, I had meant to make Close() just call CloseAsync(), thanks for catching this.

Comment on lines 1338 to 1340
if (closed_) {
return Status::Invalid("Operation on closed stream");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this check needed? If closed_ is true then FlushAsync is just going to immediately return a failed future anyways.

Aws::Vector<S3Model::CompletedPart> completed_parts;
int64_t parts_in_progress = 0;
Status status;
Future<> completed = Future<>::MakeFinished(Status::OK());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: completed is a little bit misleading here. A user could write, flush, and continue writing. I'm not sure what a better name would be but maybe something like writes_in_progress?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed it pending_parts_completed if that makes more sense.

@lidavidm
Copy link
Member Author

@westonpace I think I've addressed all feedback, any final comments here?

@westonpace
Copy link
Member

@lidavidm Those fixes look good, I think this is ready.

@lidavidm lidavidm closed this in bafaa76 Jan 12, 2022
@lidavidm lidavidm deleted the arrow-15265 branch January 12, 2022 20:32
@ursabot
Copy link

ursabot commented Jan 12, 2022

Benchmark runs are scheduled for baseline = 0bce440 and contender = bafaa76. bafaa76 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed ⬇️0.45% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.09% ⬆️0.0%] ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python. Runs only benchmarks with cloud = True
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants