Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R][C++] write_dataset() hangs when pivot_() operation is performed before #35649

Closed
PMassicotte opened this issue May 17, 2023 · 9 comments · Fixed by #36307
Closed

[R][C++] write_dataset() hangs when pivot_() operation is performed before #35649

PMassicotte opened this issue May 17, 2023 · 9 comments · Fixed by #36307

Comments

@PMassicotte
Copy link

Describe the bug, including details regarding any error messages, version, and platform.

I am trying to work on a dataset without having to pull it with collect() I found out that if I was using pivot_longer() in the chain of operation, write_dataset() is hanging and nothing seems to happen.

library(tidyverse)
library(arrow)


one_level_tree <- tempfile()

mtcars |>
  to_duckdb() |>
  pivot_longer(everything()) |>
  to_arrow() |>
  # collect() |> # collecting make it work, otherwise, it hangs on write_dataset()
  write_dataset(one_level_tree, partitioning = "name")

list.files(one_level_tree, recursive = TRUE)
r$> sessioninfo::session_info()
─ Session info ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
 setting  value
 version  R version 4.3.0 (2023-04-21)
 os       Ubuntu 23.04
 system   x86_64, linux-gnu
 ui       X11
 language en_CA:en
 collate  en_CA.UTF-8
 ctype    en_CA.UTF-8
 tz       America/Toronto
 date     2023-05-17
 pandoc   NAPackages ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
 package     * version date (UTC) lib source
 arrow       * 12.0.0  2023-05-05 [1] RSPM (R 4.3.0)
 assertthat    0.2.1   2019-03-21 [1] RSPM (R 4.3.0)
 bit           4.0.5   2022-11-15 [1] RSPM (R 4.3.0)
 bit64         4.0.5   2020-08-30 [1] RSPM (R 4.3.0)
 cli           3.6.1   2023-03-23 [1] RSPM (R 4.3.0)
 colorspace    2.1-0   2023-01-23 [1] RSPM (R 4.3.0)
 dplyr       * 1.1.2   2023-04-20 [1] RSPM (R 4.3.0)
 fansi         1.0.4   2023-01-22 [1] RSPM (R 4.3.0)
 forcats     * 1.0.0   2023-01-29 [1] RSPM (R 4.3.0)
 generics      0.1.3   2022-07-05 [1] RSPM (R 4.3.0)
 ggplot2     * 3.4.2   2023-04-03 [1] RSPM (R 4.3.0)
 glue          1.6.2   2022-02-24 [1] RSPM (R 4.3.0)
 gtable        0.3.3   2023-03-21 [1] RSPM (R 4.3.0)
 hms           1.1.3   2023-03-21 [1] RSPM (R 4.3.0)
 httpuv        1.6.11  2023-05-11 [1] RSPM (R 4.3.0)
 jsonlite      1.8.4   2022-12-06 [1] RSPM (R 4.3.0)
 later         1.3.1   2023-05-02 [1] RSPM (R 4.3.0)
 lifecycle     1.0.3   2022-10-07 [1] RSPM (R 4.3.0)
 lubridate   * 1.9.2   2023-02-10 [1] RSPM (R 4.3.0)
 magrittr      2.0.3   2022-03-30 [1] RSPM (R 4.3.0)
 munsell       0.5.0   2018-06-12 [1] RSPM (R 4.3.0)
 pillar        1.9.0   2023-03-22 [1] RSPM (R 4.3.0)
 pkgconfig     2.0.3   2019-09-22 [1] RSPM (R 4.3.0)
 promises      1.2.0.1 2021-02-11 [1] RSPM (R 4.3.0)
 purrr       * 1.0.1   2023-01-10 [1] RSPM (R 4.3.0)
 R6            2.5.1   2021-08-19 [1] RSPM (R 4.3.0)
 Rcpp          1.0.10  2023-01-22 [1] RSPM (R 4.3.0)
 readr       * 2.1.4   2023-02-10 [1] RSPM (R 4.3.0)
 rlang         1.1.1   2023-04-28 [1] RSPM (R 4.3.0)
 scales        1.2.1   2022-08-20 [1] RSPM (R 4.3.0)
 sessioninfo   1.2.2   2021-12-06 [1] RSPM (R 4.3.0)
 stringi       1.7.12  2023-01-11 [1] CRAN (R 4.3.0)
 stringr     * 1.5.0   2022-12-02 [1] RSPM (R 4.3.0)
 tibble      * 3.2.1   2023-03-20 [1] RSPM (R 4.3.0)
 tidyr       * 1.3.0   2023-01-24 [1] RSPM (R 4.3.0)
 tidyselect    1.2.0   2022-10-10 [1] RSPM (R 4.3.0)
 tidyverse   * 2.0.0   2023-02-22 [1] RSPM (R 4.3.0)
 timechange    0.2.0   2023-01-11 [1] RSPM (R 4.3.0)
 tzdb          0.4.0   2023-05-12 [1] RSPM (R 4.3.0)
 utf8          1.2.3   2023-01-31 [1] RSPM (R 4.3.0)
 vctrs         0.6.2   2023-04-19 [1] RSPM (R 4.3.0)
 withr         2.5.0   2022-03-03 [1] RSPM (R 4.3.0)

Component(s)

R

@thisisnic thisisnic changed the title write_dataset() hangs when pivot_() operation is performed before [R] write_dataset() hangs when pivot_() operation is performed before May 22, 2023
@thisisnic
Copy link
Member

I can confirm that this can be replicated in dev Arrow. I get the following error:

Error: IOError: Invalid Error: std::exception
/home/nic2/arrow/cpp/src/arrow/c/bridge.cc:1797  StatusFromCError(stream_.get_next(&stream_, &c_array))
/home/nic2/arrow/cpp/src/arrow/record_batch.h:257  ReadNext(&batch)
/home/nic2/arrow/cpp/src/arrow/util/iterator.h:428  it_.Next()

@thisisnic thisisnic changed the title [R] write_dataset() hangs when pivot_() operation is performed before [R][C++] write_dataset() hangs when pivot_() operation is performed before May 30, 2023
@paleolimbot
Copy link
Member

FWIW, I was only able to replicate this on Linux (MacOS/dev arrow and Windows/Release arrow successfully wrote the dataset). On Linux I got an additional note about C stack usage:

Error: C stack usage  890036266436 is too close to the limit
Error: IOError: Invalid Error: std::exception
/home/dewey/rscratch/arrow/cpp/src/arrow/c/bridge.cc:1797  StatusFromCError(stream_.get_next(&stream_, &c_array))
/home/dewey/rscratch/arrow/cpp/src/arrow/record_batch.h:257  ReadNext(&batch)
/home/dewey/rscratch/arrow/cpp/src/arrow/util/iterator.h:428  it_.Next()

@PMassicotte
Copy link
Author

Interesting. Do you have an idea what can be the origin of the problem?

@westonpace
Copy link
Member

Normally when I see a stack usage warning like that it's because of excessive (or infinite) recursion. However, that doesn't seem to be the case here:

It's a (refreshingly) readable stack trace:

Thread 40 "R" hit Catchpoint 1 (exception thrown), __cxxabiv1::__cxa_throw (obj=0x7fff3114da20, tinfo=0x7fff56e79ac8 <typeinfo for cpp11::unwind_exception>, dest=0x7fff55c8dcb0 <cpp11::unwind_exception::~unwind_exception()>) at ../../../../libstdc++-v3/libsupc++/eh_throw.cc:80
80	../../../../libstdc++-v3/libsupc++/eh_throw.cc: No such file or directory.
(gdb) bt
#0  __cxxabiv1::__cxa_throw (obj=0x7fff3114da20, tinfo=0x7fff56e79ac8 <typeinfo for cpp11::unwind_exception>, dest=0x7fff55c8dcb0 <cpp11::unwind_exception::~unwind_exception()>)
    at ../../../../libstdc++-v3/libsupc++/eh_throw.cc:80
#1  0x00007fff55c961a7 in SEXPREC* cpp11::unwind_protect<cpp11::detail::closure<SEXPREC* (SEXPREC*, SEXPREC*), cpp11::sexp&, SEXPREC*&>, void>(cpp11::detail::closure<SEXPREC* (SEXPREC*, SEXPREC*), cpp11::sexp&, SEXPREC*&>&&) () from /home/pace/miniconda3/envs/conbench3/lib/R/library/duckdb/libs/duckdb.so
#2  0x00007fff55ca599d in RArrowTabularStreamFactory::Produce(unsigned long, duckdb::ArrowStreamParameters&) () from /home/pace/miniconda3/envs/conbench3/lib/R/library/duckdb/libs/duckdb.so
#3  0x00007fff565bbd15 in duckdb::ProduceArrowScan(duckdb::ArrowScanFunctionData const&, duckdb::vector<unsigned long, true> const&, duckdb::TableFilterSet*) ()
   from /home/pace/miniconda3/envs/conbench3/lib/R/library/duckdb/libs/duckdb.so
#4  0x00007fff565bc82c in duckdb::ArrowTableFunction::ArrowScanInitGlobal(duckdb::ClientContext&, duckdb::TableFunctionInitInput&) ()
   from /home/pace/miniconda3/envs/conbench3/lib/R/library/duckdb/libs/duckdb.so
#5  0x00007fff563a7255 in duckdb::PhysicalTableScan::GetGlobalSourceState(duckdb::ClientContext&) const () from /home/pace/miniconda3/envs/conbench3/lib/R/library/duckdb/libs/duckdb.so
#6  0x00007fff566e87df in duckdb::Pipeline::ResetSource(bool) () from /home/pace/miniconda3/envs/conbench3/lib/R/library/duckdb/libs/duckdb.so
#7  0x00007fff566e898b in duckdb::Pipeline::Reset() () from /home/pace/miniconda3/envs/conbench3/lib/R/library/duckdb/libs/duckdb.so
#8  0x00007fff566f7d29 in duckdb::Executor::NextExecutor() () from /home/pace/miniconda3/envs/conbench3/lib/R/library/duckdb/libs/duckdb.so
#9  0x00007fff566f8064 in duckdb::Executor::FetchChunk() () from /home/pace/miniconda3/envs/conbench3/lib/R/library/duckdb/libs/duckdb.so
#10 0x00007fff56642f0e in duckdb::ClientContext::FetchInternal(duckdb::ClientContextLock&, duckdb::Executor&, duckdb::BaseQueryResult&) ()
   from /home/pace/miniconda3/envs/conbench3/lib/R/library/duckdb/libs/duckdb.so
#11 0x00007fff56642fcf in duckdb::ClientContext::Fetch(duckdb::ClientContextLock&, duckdb::StreamQueryResult&) () from /home/pace/miniconda3/envs/conbench3/lib/R/library/duckdb/libs/duckdb.so
#12 0x00007fff5664307d in duckdb::StreamQueryResult::FetchRaw() () from /home/pace/miniconda3/envs/conbench3/lib/R/library/duckdb/libs/duckdb.so
#13 0x00007fff56614a82 in duckdb::QueryResult::Fetch() () from /home/pace/miniconda3/envs/conbench3/lib/R/library/duckdb/libs/duckdb.so
#14 0x00007fff55d913d5 in duckdb::QueryResult::TryFetch(duckdb::unique_ptr<duckdb::DataChunk, std::default_delete<duckdb::DataChunk>, true>&, duckdb::PreservedError&) ()
   from /home/pace/miniconda3/envs/conbench3/lib/R/library/duckdb/libs/duckdb.so
#15 0x00007fff55d9857b in duckdb::ArrowUtil::TryFetchChunk(duckdb::QueryResult*, unsigned long, ArrowArray*, unsigned long&, duckdb::PreservedError&) ()
   from /home/pace/miniconda3/envs/conbench3/lib/R/library/duckdb/libs/duckdb.so
#16 0x00007fff55d989a1 in duckdb::ResultArrowArrayStreamWrapper::MyStreamGetNext(ArrowArrayStream*, ArrowArray*) () from /home/pace/miniconda3/envs/conbench3/lib/R/library/duckdb/libs/duckdb.so
#17 0x00007fff6875961e in arrow::(anonymous namespace)::ArrayStreamBatchReader::ReadNext (this=0x555557df4d50, batch=0x7fff3f7ee7e0) at /home/pace/dev/arrow/cpp/src/arrow/c/bridge.cc:1797
#18 0x00007fff6c264bfb in arrow::RecordBatchReader::Next (this=0x555557df4d50) at /home/pace/dev/arrow/cpp/src/arrow/record_batch.h:257
#19 0x00007fff6c26dd0f in arrow::MakeIteratorFromReader<arrow::RecordBatchReader>(std::shared_ptr<arrow::RecordBatchReader> const&)::{lambda()#1}::operator()() const (__closure=0x55555bbc39d0)
    at /home/pace/dev/arrow/cpp/src/arrow/util/iterator.h:565
#20 0x00007fff6c2863a8 in arrow::FunctionIterator<arrow::MakeIteratorFromReader<arrow::RecordBatchReader>(std::shared_ptr<arrow::RecordBatchReader> const&)::{lambda()#1}, std::shared_ptr<arrow::RecordBatch> >::Next() (this=0x55555bbc39d0) at /home/pace/dev/arrow/cpp/src/arrow/util/iterator.h:346
#21 0x00007fff6c28257f in arrow::Iterator<std::shared_ptr<arrow::RecordBatch> >::Next<arrow::FunctionIterator<arrow::MakeIteratorFromReader<arrow::RecordBatchReader>(std::shared_ptr<arrow::RecordBatchReader> const&)::{lambda()#1}, std::shared_ptr<arrow::RecordBatch> > >(void*) (ptr=0x55555bbc39d0) at /home/pace/dev/arrow/cpp/src/arrow/util/iterator.h:200
#22 0x00007fff6c26d787 in arrow::Iterator<std::shared_ptr<arrow::RecordBatch> >::Next (this=0x55555b96d898) at /home/pace/dev/arrow/cpp/src/arrow/util/iterator.h:110
#23 0x00007fff6c32e09b in arrow::MapIterator<arrow::acero::(anonymous namespace)::RecordBatchReaderSourceNode::MakeGenerator(const std::shared_ptr<arrow::RecordBatchReader>&, arrow::internal::Executor*)::<lambda(const std::shared_ptr<arrow::RecordBatch>&)>, std::shared_ptr<arrow::RecordBatch>, std::optional<arrow::compute::ExecBatch> >::Next(void) (this=0x55555b96d890)
    at /home/pace/dev/arrow/cpp/src/arrow/util/iterator.h:428
#24 0x00007fff6c32cf1d in arrow::Iterator<std::optional<arrow::compute::ExecBatch> >::Next<arrow::MapIterator<arrow::acero::(anonymous namespace)::RecordBatchReaderSourceNode::MakeGenerator(const std::shared_ptr<arrow::RecordBatchReader>&, arrow::internal::Executor*)::<lambda(const std::shared_ptr<arrow::RecordBatch>&)>, std::shared_ptr<arrow::RecordBatch>, std::optional<arrow::compute::ExecBatch> > >(void *) (
    ptr=0x55555b96d890) at /home/pace/dev/arrow/cpp/src/arrow/util/iterator.h:200
#25 0x00007fff6c269a85 in arrow::Iterator<std::optional<arrow::compute::ExecBatch> >::Next (this=0x555558b2fbe0) at /home/pace/dev/arrow/cpp/src/arrow/util/iterator.h:110
#26 0x00007fff6c290c44 in arrow::BackgroundGenerator<std::optional<arrow::compute::ExecBatch> >::WorkerTask (
    state=std::shared_ptr<arrow::BackgroundGenerator<std::optional<arrow::compute::ExecBatch> >::State> (use count 3, weak count 0) = {...}) at /home/pace/dev/arrow/cpp/src/arrow/util/async_generator.h:1759
#27 0x00007fff6c28fd78 in arrow::BackgroundGenerator<std::optional<arrow::compute::ExecBatch> >::State::DoRestartTask(std::shared_ptr<arrow::BackgroundGenerator<std::optional<arrow::compute::ExecBatch> >::State>, arrow::util::Mutex::Guard)::{lambda()#1}::operator()() const (__closure=0x7fff38001ca8) at /home/pace/dev/arrow/cpp/src/arrow/util/async_generator.h:1666
#28 0x00007fff6c293b96 in arrow::internal::FnOnce<void ()>::FnImpl<arrow::BackgroundGenerator<std::optional<arrow::compute::ExecBatch> >::State::DoRestartTask(std::shared_ptr<arrow::BackgroundGenerator<std::optional<arrow::compute::ExecBatch> >::State>, arrow::util::Mutex::Guard)::{lambda()#1}>::invoke() (this=0x7fff38001ca0) at /home/pace/dev/arrow/cpp/src/arrow/util/functional.h:152
#29 0x00007fff688a90a7 in arrow::internal::FnOnce<void ()>::operator()() && (this=0x7fff3f7eec70) at /home/pace/dev/arrow/cpp/src/arrow/util/functional.h:140
#30 0x00007fff688a37c0 in arrow::internal::WorkerLoop (state=std::shared_ptr<arrow::internal::ThreadPool::State> (use count 5, weak count 1) = {...}, it={_M_id = {_M_thread = 140734258734656}})
    at /home/pace/dev/arrow/cpp/src/arrow/util/thread_pool.cc:269
#31 0x00007fff688a46ee in operator() (__closure=0x7fff38001ce8) at /home/pace/dev/arrow/cpp/src/arrow/util/thread_pool.cc:430
#32 0x00007fff688a871e in std::__invoke_impl<void, arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> >(std::__invoke_other, struct {...} &&) (__f=...) at /usr/include/c++/11/bits/invoke.h:61
#33 0x00007fff688a86e1 in std::__invoke<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> >(struct {...} &&) (__fn=...) at /usr/include/c++/11/bits/invoke.h:96
#34 0x00007fff688a868e in std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > >::_M_invoke<0>(std::_Index_tuple<0>) (this=0x7fff38001ce8)
    at /usr/include/c++/11/bits/std_thread.h:253
#35 0x00007fff688a8662 in std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > >::operator()(void) (this=0x7fff38001ce8)
    at /usr/include/c++/11/bits/std_thread.h:260
#36 0x00007fff688a8646 in std::thread::_State_impl<std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > > >::_M_run(void) (this=0x7fff38001ce0)
    at /usr/include/c++/11/bits/std_thread.h:211
--Type <RET> for more, q to quit, c to continue without paging--
#37 0x00007ffff4919a93 in std::execute_native_thread_routine (__p=<optimized out>) at ../../../../../libstdc++-v3/src/c++11/thread.cc:82
#38 0x00007ffff7694b43 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#39 0x00007ffff7726a00 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81

There are 1 or 2 other active threads and none of them seem to have any deep stack either.

@paleolimbot
Copy link
Member

I seem to remember that the input to DuckDB is something along the lines of "all the R objects needed to initialize a scan". It seems like what may be happening is that the call to initialize the scan is getting called from another thread...maybe the pivot operation requires more than one scan?

When R code gets called from another thread (outside the Arrow R package, where we do this safely), the failure mode can be wildly variable.

@westonpace
Copy link
Member

Yes, I think this is an example of a non-R thread calling R code. This is an Arrow worker thread running the query. The Arrow worker is calling duckdb and asking for the next batch. Duckdb is then calling into R and asking for the next batch.

Perhaps DuckDb needs something like SafeCallIntoR but that wouldn't be trivial as you know probably better than I do.

If Acero knew that the source was going to be calling into R then Acero could use SafeCallIntoR to fetch the next batch but, from Acero's perspective, it's just grabbing a batch from "some kind of C data producer".

Another possibility would be to do something like run the entire Acero plan synchronously (both I/O threads and CPU threads). Acero would never spawn a thread so you'll always be on the R thread but that will tank performance (especially running the I/O tasks synchronously)

@paleolimbot
Copy link
Member

Perhaps DuckDb needs something like SafeCallIntoR but that wouldn't be trivial as you know probably better than I do.

There's unfortunately no universal solution to this at the moment, but it's definitely the long-term solution for something like this. At the very least, duckdb could check whether it's on the main R thread and error otherwise (rather than hang or crash).

If Acero knew that the source was going to be calling into R then Acero could use SafeCallIntoR to fetch the next batch but, from Acero's perspective, it's just grabbing a batch from "some kind of C data producer".

I think we might be able to make this one happen...this is basically a wrapping a RecordBatchReader with SafeCallIntoR<RecordBatch>(reader.ReadNext()).

...given that literally all of those things are my fault, I'm happy to take a stab at fixing!

@westonpace
Copy link
Member

I think we might be able to make this one happen...this is basically a wrapping a RecordBatchReader with SafeCallIntoR(reader.ReadNext()).

Ah, yes, that makes sense. R is the one providing the C data stream source to Acero so I suppose you have the power to wrap it. And, it makes sense for you to assume that whatever it is will make R calls.

...given that literally all of those things are my fault, I'm happy to take a stab at fixing!

I think we can place a healthy amount of the blame both on myself and on R's ambitious threading model 😆

@paleolimbot
Copy link
Member

R's ambitious threading model

Or severe lack of ambition, as it may be.

thisisnic pushed a commit that referenced this issue Jun 29, 2023
… from the main R thread (#36307)

### Rationale for this change

When passing a DuckDB result to Arrow via `to_arrow()` whose input was an Arrow dataset, calls to R code from other threads can occur in some DuckDB operations. This caused a crash or hang on Linux when attempting to combine `pivot_longer()` and `write_dataset()`.

### What changes are included in this PR?

- Added a wrapper class around the `RecordBatchReader` that routes calls to `ReadNext()` through `SafeCallIntoR()`.

### Are these changes tested?

I can't find a new case that isn't covered by our existing tests, although I did remove a skip that was causing a similar problem at one point (#33033). Because it's difficult to predict/test where duckdb evaluates R code, it's hard to know exactly what to test here (I would have expected R code to be evaluated/a crash to occur with many of our existing tests, but even the `pivot_longer()` example does not crash on MacOS and Windows 🤷 ).

I did verify on Ubuntu 22.04 that the reprex kindly provided by @ PMassicotte errors before this PR and does not error after this PR:

```r
library(tidyverse)
library(arrow)

one_level_tree <- tempfile()

mtcars |>
  to_duckdb() |>
  pivot_longer(everything()) |>
  to_arrow() |>
  # collect() |> # collecting make it work, otherwise, it hangs on write_dataset()
  write_dataset(one_level_tree, partitioning = "name")

list.files(one_level_tree, recursive = TRUE)
```

### Are there any user-facing changes?

There are no user facing changes.
* Closes: #35649

Authored-by: Dewey Dunnington <dewey@voltrondata.com>
Signed-off-by: Nic Crane <thisisnic@gmail.com>
@thisisnic thisisnic added this to the 13.0.0 milestone Jun 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment