Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ruby] Random segfault in hash join #35819

Closed
stenlarsson opened this issue May 30, 2023 · 10 comments · Fixed by #35963
Closed

[Ruby] Random segfault in hash join #35819

stenlarsson opened this issue May 30, 2023 · 10 comments · Fixed by #35963

Comments

@stenlarsson
Copy link
Contributor

Describe the bug, including details regarding any error messages, version, and platform.

We have some (complicated) Ruby code that joins a lot of tables, and it randomly crashes. Sometimes you get a stack trace, but sometimes the Ruby crash handler in turn crashes. This indicates that garbage has been written to the stack. I have also seen cases where it doesn't crash, but there is some random garbage in the result.

I installed the libarrow-acero1200-dbgsym and libarrow1200-dbgsym packages in our Docker image, and ran Ruby through GDB. This way I'm (sometimes) able to capture a stack trace. Here is one example:

#0  arrow::compute::ExecBatchBuilder::Visit<arrow::compute::ExecBatchBuilder::AppendSelected(const std::shared_ptr<arrow::ArrayData>&, arrow::compute::ResizableArrayData*, int, const uint16_t*, arrow::MemoryPool*)::<lambda(int, const uint8_t*, uint32_t)> > (process_value_fn=..., row_ids=0xfffeed18d3b8, num_rows=1024, column=<optimized out>) at ./cpp/src/arrow/compute/light_array.cc:467
#1  arrow::compute::ExecBatchBuilder::AppendSelected(std::shared_ptr<arrow::ArrayData> const&, arrow::compute::ResizableArrayData*, int, unsigned short const*, arrow::MemoryPool*)
    (source=std::shared_ptr<arrow::ArrayData> (use count 2, weak count 0) = {...}, target=0xffff503d2ae0, num_rows_to_append=1024, row_ids=0xfffeed18d3b8, pool=<optimized out>) at ./cpp/src/arrow/compute/light_array.cc:578
#2  0x0000ffff89ca4834 in arrow::compute::ExecBatchBuilder::AppendSelected(arrow::MemoryPool*, arrow::compute::ExecBatch const&, int, unsigned short const*, int, int const*) (this=this@entry=0xaaab2cb144d0, pool=
    0xffff8b339360 <arrow::global_state+88>, batch=..., num_rows_to_append=num_rows_to_append@entry=1024, row_ids=0xfffeed18d3b8, num_cols=<optimized out>, col_ids=0xaaab2cb15550) at /usr/include/c++/10/bits/stl_vector.h:1043
#3  0x0000ffff89381858 in arrow::acero::JoinResultMaterialize::AppendProbeOnly(arrow::compute::ExecBatch const&, int, unsigned short const*, int*)
    (this=0xaaab2cb14480, key_and_payload=<optimized out>, num_rows_to_append=1024, row_ids=<optimized out>, num_rows_appended=0xffff80d44124) at ./cpp/src/arrow/acero/swiss_join.cc:1587
#4  0x0000ffff89382434 in operator() (num_rows_appended=0xffff80d44124, offset=0, num_rows_to_append_left=1024, this=<optimized out>) at ./cpp/src/arrow/acero/swiss_join_internal.h:586
#5  arrow::acero::JoinResultMaterialize::AppendAndOutput<arrow::acero::JoinResultMaterialize::AppendProbeOnly<arrow::acero::JoinProbeProcessor::OnNextBatch(int64_t, const arrow::compute::ExecBatch&, arrow::util::TempVectorStack*, std::vector<arrow::compute::KeyColumnArray>*)::<lambda(arrow::compute::ExecBatch)> >::<lambda(int, int, int*)>, arrow::acero::JoinProbeProcessor::OnNextBatch(int64_t, const arrow::compute::ExecBatch&, arrow::util::TempVectorStack*, std::vector<arrow::compute::KeyColumnArray>*)::<lambda(arrow::compute::ExecBatch)> > (output_batch_fn=<synthetic pointer>..., append_rows_fn=<optimized out>, num_rows_to_append=1024, this=0xaaab2cb14480) at ./cpp/src/arrow/acero/swiss_join_internal.h:567
#6  arrow::acero::JoinResultMaterialize::AppendProbeOnly<arrow::acero::JoinProbeProcessor::OnNextBatch(int64_t, const arrow::compute::ExecBatch&, arrow::util::TempVectorStack*, std::vector<arrow::compute::KeyColumnArray>*)::<lambda(arrow::compute::ExecBatch)> > (output_batch_fn=..., row_ids=<optimized out>, num_rows_to_append=1024, key_and_payload=..., this=0xaaab2cb14480) at ./cpp/src/arrow/acero/swiss_join_internal.h:590
#7  arrow::acero::JoinProbeProcessor::OnNextBatch(long, arrow::compute::ExecBatch const&, arrow::util::TempVectorStack*, std::vector<arrow::compute::KeyColumnArray, std::allocator<arrow::compute::KeyColumnArray> >*)
    (this=0xaaab2cae23f0, thread_id=4, keypayload_batch=..., temp_stack=0xaaab2cb13f68, temp_column_arrays=0xaaab2cb14538) at ./cpp/src/arrow/acero/swiss_join.cc:2019
#8  0x0000ffff893886f0 in arrow::acero::SwissJoin::ProbeSingleBatch(unsigned long, arrow::compute::ExecBatch) (this=0xaaab2cae1d10, thread_index=4, batch=...) at ./cpp/src/arrow/acero/swiss_join.cc:2145
#9  0x0000ffff8934be7c in arrow::acero::HashJoinNode::OnProbeSideBatch(unsigned long, arrow::compute::ExecBatch) (this=this@entry=0xaaab2ca11800, thread_index=thread_index@entry=4, batch=...) at ./cpp/src/arrow/acero/hash_join_node.cc:818
#10 0x0000ffff8934c284 in arrow::acero::HashJoinNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch) (this=0xaaab2ca11800, input=0xaaab2ca10ae0, batch=...) at ./cpp/src/arrow/acero/hash_join_node.cc:891
#11 0x0000ffff893716f4 in operator()() const (__closure=0xffff3c51bd20) at ./cpp/src/arrow/acero/source_node.cc:119
#12 0x0000ffff893718d0 in std::__invoke_impl<arrow::Status, arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(const arrow::compute::ExecBatch&)::<lambda()>&> (__f=<optimized out>) at /usr/include/c++/10/bits/invoke.h:59
#13 std::__invoke_r<arrow::Status, arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(const arrow::compute::ExecBatch&)::<lambda()>&> (__fn=<optimized out>) at /usr/include/c++/10/bits/invoke.h:115
#14 std::_Function_handler<arrow::Status(), arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(const arrow::compute::ExecBatch&)::<lambda()> >::_M_invoke(const std::_Any_data &) (__functor=<optimized out>)
    at /usr/include/c++/10/bits/std_function.h:292
#15 0x0000ffff89365fd8 in std::function<arrow::Status ()>::operator()() const (this=<optimized out>) at /usr/include/c++/10/bits/std_function.h:622
#16 arrow::detail::ContinueFuture::operator()<std::function<arrow::Status ()>&, , arrow::Status, arrow::Future<arrow::internal::Empty> >(arrow::Future<arrow::internal::Empty>, std::function<arrow::Status ()>&) const
    (this=<optimized out>, f=<optimized out>, next=...) at ./cpp/src/arrow/util/future.h:150
#17 std::__invoke_impl<void, arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status ()>&>(std::__invoke_other, arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status ()>&) (__f=<optimized out>) at /usr/include/c++/10/bits/invoke.h:60
#18 std::__invoke<arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status ()>&>(arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status ()>&)
    (__fn=<optimized out>) at /usr/include/c++/10/bits/invoke.h:95
#19 std::_Bind<arrow::detail::ContinueFuture (arrow::Future<arrow::internal::Empty>, std::function<arrow::Status ()>)>::__call<void, , 0ul, 1ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul>) (__args=<optimized out>, this=<optimized out>)
    at /usr/include/c++/10/functional:416
#20 std::_Bind<arrow::detail::ContinueFuture (arrow::Future<arrow::internal::Empty>, std::function<arrow::Status ()>)>::operator()<, void>() (this=<optimized out>) at /usr/include/c++/10/functional:499
#21 arrow::internal::FnOnce<void ()>::FnImpl<std::_Bind<arrow::detail::ContinueFuture (arrow::Future<arrow::internal::Empty>, std::function<arrow::Status ()>)> >::invoke() (this=<optimized out>) at ./cpp/src/arrow/util/functional.h:152
#22 0x0000ffff89b3cf38 in arrow::internal::FnOnce<void ()>::operator()() && (this=0xffff80d44968) at /usr/include/c++/10/bits/unique_ptr.h:172
#23 arrow::internal::WorkerLoop (it=Python Exception <class 'gdb.error'> value has been optimized out:
{_M_id = {_M_thread = 281472843141584}}, state=) at ./cpp/src/arrow/util/thread_pool.cc:269
#24 operator() (__closure=<optimized out>) at ./cpp/src/arrow/util/thread_pool.cc:430
#25 std::__invoke_impl<void, arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > (__f=<optimized out>) at /usr/include/c++/10/bits/invoke.h:60
#26 std::__invoke<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > (__fn=<optimized out>) at /usr/include/c++/10/bits/invoke.h:95
#27 std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > >::_M_invoke<0> (this=<optimized out>) at /usr/include/c++/10/thread:264
#28 std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > >::operator() (this=<optimized out>) at /usr/include/c++/10/thread:271
#29 std::thread::_State_impl<std::thread::_Invoker<std::tuple<arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::<lambda()> > > >::_M_run(void) (this=<optimized out>) at /usr/include/c++/10/thread:215
#30 0x0000ffff89164cac in  () at /usr/lib/aarch64-linux-gnu/libstdc++.so.6
#31 0x0000ffff9018f648 in start_thread (arg=0xffff80d44ad0) at pthread_create.c:477
#32 0x0000ffff8ff3dfdc in thread_start () at ../sysdeps/unix/sysv/linux/aarch64/clone.S:78

Because this is so random I have unfortunately not been able to create a small reproducible test that I can share with you. I tried setting OMP_NUM_THREADS=1, but I couldn't get it to crash. Please let me know if there is anything more I can do to troubleshoot this.

This problem does not happen in Arrow 10, but I have been able to reproduce it in both Arrow 11 and 12, on both amd64 and arm64, and on both macOS and Linux.

Component(s)

Ruby

@kou kou changed the title Random segfault in hash join [Ruby] Random segfault in hash join May 30, 2023
@kou
Copy link
Member

kou commented May 30, 2023

Hmm. I want to check whether this is related to Ruby's GC or not.

Could you try with GC.disable?
(Note that your script will use many memory with GC.disable.)

@stenlarsson
Copy link
Contributor Author

It was harder to make it crash with GC.disable but I managed to do it in my macOS development environment. I don't think you can install debug symbols with Homebrew, sorry.

* thread #11, stop reason = EXC_BAD_ACCESS (code=1, address=0x31d4d0020)
  * frame #0: 0x0000000129932f74 libarrow.1200.0.0.dylib`arrow::util::TempVectorStack::alloc(unsigned int, unsigned char**, int*) + 120
    frame #1: 0x0000000129926afc libarrow.1200.0.0.dylib`arrow::compute::KeyCompare::CompareColumnsToRows(unsigned int, unsigned short const*, unsigned int const*, arrow::compute::LightContext*, unsigned int*, unsigned short*, std::__1::vector<arrow::compute::KeyColumnArray, std::__1::allocator<arrow::compute::KeyColumnArray> > const&, arrow::compute::RowTableImpl const&, bool, unsigned char*) + 132
    frame #2: 0x000000012808a230 libarrow_acero.1200.0.0.dylib`arrow::acero::RowArray::Compare(arrow::compute::ExecBatch const&, int, int, int, unsigned short const*, unsigned int const*, unsigned int*, unsigned short*, long long, arrow::util::TempVectorStack*, std::__1::vector<arrow::compute::KeyColumnArray, std::__1::allocator<arrow::compute::KeyColumnArray> >&, unsigned char*) + 144
    frame #3: 0x000000012808c91c libarrow_acero.1200.0.0.dylib`arrow::acero::SwissTableWithKeys::EqualCallback(int, unsigned short const*, unsigned int const*, unsigned int*, unsigned short*, void*) + 260
    frame #4: 0x0000000129871cb4 libarrow.1200.0.0.dylib`std::__1::function<void (int, unsigned short const*, unsigned int const*, unsigned int*, unsigned short*, void*)>::operator()(int, unsigned short const*, unsigned int const*, unsigned int*, unsigned short*, void*) const + 72
    frame #5: 0x0000000129872004 libarrow.1200.0.0.dylib`arrow::compute::SwissTable::find(int, unsigned int const*, unsigned char*, unsigned char const*, unsigned int*, arrow::util::TempVectorStack*, std::__1::function<void (int, unsigned short const*, unsigned int const*, unsigned int*, unsigned short*, void*)> const&, void*) const + 832
    frame #6: 0x000000012808ce38 libarrow_acero.1200.0.0.dylib`arrow::acero::SwissTableWithKeys::Map(arrow::acero::SwissTableWithKeys::Input*, bool, unsigned int const*, unsigned char*, unsigned int*) + 528
    frame #7: 0x000000012808cc08 libarrow_acero.1200.0.0.dylib`arrow::acero::SwissTableWithKeys::MapReadOnly(arrow::acero::SwissTableWithKeys::Input*, unsigned int const*, unsigned char*, unsigned int*) + 36
    frame #8: 0x000000012808f638 libarrow_acero.1200.0.0.dylib`arrow::acero::JoinProbeProcessor::OnNextBatch(long long, arrow::compute::ExecBatch const&, arrow::util::TempVectorStack*, std::__1::vector<arrow::compute::KeyColumnArray, std::__1::allocator<arrow::compute::KeyColumnArray> >*) + 560
    frame #9: 0x0000000128091170 libarrow_acero.1200.0.0.dylib`arrow::acero::SwissJoin::ProbeSingleBatch(unsigned long, arrow::compute::ExecBatch) + 540
    frame #10: 0x000000012806bc1c libarrow_acero.1200.0.0.dylib`arrow::acero::HashJoinNode::OnProbeSideBatch(unsigned long, arrow::compute::ExecBatch) + 216
    frame #11: 0x000000012806ae40 libarrow_acero.1200.0.0.dylib`arrow::acero::HashJoinNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch) + 364
    frame #12: 0x000000012806e2d4 libarrow_acero.1200.0.0.dylib`arrow::acero::HashJoinNode::OutputBatchCallback(arrow::compute::ExecBatch) + 100
    frame #13: 0x000000012806e1e0 libarrow_acero.1200.0.0.dylib`arrow::acero::HashJoinNode::Init()::'lambda'(long long, arrow::compute::ExecBatch)::operator()(long long, arrow::compute::ExecBatch) const + 116
    frame #14: 0x000000012806e0e0 libarrow_acero.1200.0.0.dylib`decltype(std::declval<arrow::acero::HashJoinNode::Init()::'lambda'(long long, arrow::compute::ExecBatch)&>()(std::declval<long long>(), std::declval<arrow::compute::ExecBatch>())) std::__1::__invoke[abi:v15006]<arrow::acero::HashJoinNode::Init()::'lambda'(long long, arrow::compute::ExecBatch)&, long long, arrow::compute::ExecBatch>(arrow::acero::HashJoinNode::Init()::'lambda'(long long, arrow::compute::ExecBatch)&, long long&&, arrow::compute::ExecBatch&&) + 84
    frame #15: 0x000000012808f8f4 libarrow_acero.1200.0.0.dylib`arrow::acero::JoinProbeProcessor::OnNextBatch(long long, arrow::compute::ExecBatch const&, arrow::util::TempVectorStack*, std::__1::vector<arrow::compute::KeyColumnArray, std::__1::allocator<arrow::compute::KeyColumnArray> >*) + 1260
    frame #16: 0x0000000128091170 libarrow_acero.1200.0.0.dylib`arrow::acero::SwissJoin::ProbeSingleBatch(unsigned long, arrow::compute::ExecBatch) + 540
    frame #17: 0x000000012806bc1c libarrow_acero.1200.0.0.dylib`arrow::acero::HashJoinNode::OnProbeSideBatch(unsigned long, arrow::compute::ExecBatch) + 216
    frame #18: 0x000000012806ae40 libarrow_acero.1200.0.0.dylib`arrow::acero::HashJoinNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch) + 364
    frame #19: 0x000000012806e2d4 libarrow_acero.1200.0.0.dylib`arrow::acero::HashJoinNode::OutputBatchCallback(arrow::compute::ExecBatch) + 100
    frame #20: 0x000000012806e1e0 libarrow_acero.1200.0.0.dylib`arrow::acero::HashJoinNode::Init()::'lambda'(long long, arrow::compute::ExecBatch)::operator()(long long, arrow::compute::ExecBatch) const + 116
    frame #21: 0x000000012806e0e0 libarrow_acero.1200.0.0.dylib`decltype(std::declval<arrow::acero::HashJoinNode::Init()::'lambda'(long long, arrow::compute::ExecBatch)&>()(std::declval<long long>(), std::declval<arrow::compute::ExecBatch>())) std::__1::__invoke[abi:v15006]<arrow::acero::HashJoinNode::Init()::'lambda'(long long, arrow::compute::ExecBatch)&, long long, arrow::compute::ExecBatch>(arrow::acero::HashJoinNode::Init()::'lambda'(long long, arrow::compute::ExecBatch)&, long long&&, arrow::compute::ExecBatch&&) + 84
    frame #22: 0x000000012808f8f4 libarrow_acero.1200.0.0.dylib`arrow::acero::JoinProbeProcessor::OnNextBatch(long long, arrow::compute::ExecBatch const&, arrow::util::TempVectorStack*, std::__1::vector<arrow::compute::KeyColumnArray, std::__1::allocator<arrow::compute::KeyColumnArray> >*) + 1260
    frame #23: 0x0000000128091170 libarrow_acero.1200.0.0.dylib`arrow::acero::SwissJoin::ProbeSingleBatch(unsigned long, arrow::compute::ExecBatch) + 540
    frame #24: 0x000000012806bc1c libarrow_acero.1200.0.0.dylib`arrow::acero::HashJoinNode::OnProbeSideBatch(unsigned long, arrow::compute::ExecBatch) + 216
    frame #25: 0x000000012806ae40 libarrow_acero.1200.0.0.dylib`arrow::acero::HashJoinNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch) + 364
    frame #26: 0x000000012806e2d4 libarrow_acero.1200.0.0.dylib`arrow::acero::HashJoinNode::OutputBatchCallback(arrow::compute::ExecBatch) + 100
    frame #27: 0x000000012806e1e0 libarrow_acero.1200.0.0.dylib`arrow::acero::HashJoinNode::Init()::'lambda'(long long, arrow::compute::ExecBatch)::operator()(long long, arrow::compute::ExecBatch) const + 116
    frame #28: 0x000000012806e0e0 libarrow_acero.1200.0.0.dylib`decltype(std::declval<arrow::acero::HashJoinNode::Init()::'lambda'(long long, arrow::compute::ExecBatch)&>()(std::declval<long long>(), std::declval<arrow::compute::ExecBatch>())) std::__1::__invoke[abi:v15006]<arrow::acero::HashJoinNode::Init()::'lambda'(long long, arrow::compute::ExecBatch)&, long long, arrow::compute::ExecBatch>(arrow::acero::HashJoinNode::Init()::'lambda'(long long, arrow::compute::ExecBatch)&, long long&&, arrow::compute::ExecBatch&&) + 84
    frame #29: 0x00000001280904c8 libarrow_acero.1200.0.0.dylib`arrow::acero::JoinProbeProcessor::OnFinished() + 316
    frame #30: 0x0000000128094468 libarrow_acero.1200.0.0.dylib`arrow::acero::SwissJoin::OnScanHashTableFinished() + 52
    frame #31: 0x0000000128094d40 libarrow_acero.1200.0.0.dylib`arrow::acero::SwissJoin::StartScanHashTable(long long) + 148
    frame #32: 0x0000000128091338 libarrow_acero.1200.0.0.dylib`arrow::acero::SwissJoin::ProbingFinished(unsigned long) + 48
    frame #33: 0x000000012806af40 libarrow_acero.1200.0.0.dylib`arrow::acero::HashJoinNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch) + 620
    frame #34: 0x00000001280867e0 libarrow_acero.1200.0.0.dylib`std::__1::__function::__func<arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'(), std::__1::allocator<arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()>, arrow::Status ()>::operator()() + 500
    frame #35: 0x000000012807c940 libarrow_acero.1200.0.0.dylib`std::__1::enable_if<!std::is_void<arrow::Status>::value && !is_future<arrow::Status>::value && (!arrow::Future<arrow::internal::Empty>::is_empty || std::is_same<arrow::Status, arrow::Status>::value), void>::type arrow::detail::ContinueFuture::operator()<std::__1::function<arrow::Status ()>&, arrow::Status, arrow::Future<arrow::internal::Empty> >(arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()>&) const + 44
    frame #36: 0x000000012807c8bc libarrow_acero.1200.0.0.dylib`std::__1::__bind_return<arrow::detail::ContinueFuture, std::__1::tuple<arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()> >, std::__1::tuple<>, __is_valid_bind_return<arrow::detail::ContinueFuture, std::__1::tuple<arrow::Future<arrow::internal::Empty>, std::__1::function<arrow::Status ()> >, std::__1::tuple<> >::value>::type std::__1::__bind<arrow::detail::ContinueFuture, arrow::Future<arrow::internal::Empty>&, std::__1::function<arrow::Status ()> >::operator()[abi:v15006]<>() + 52
    frame #37: 0x0000000129764b18 libarrow.1200.0.0.dylib`void* std::__1::__thread_proxy[abi:v15006]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, arrow::internal::ThreadPool::LaunchWorkersUnlocked(int)::$_6> >(void*) + 392
    frame #38: 0x0000000198153fa8 libsystem_pthread.dylib`_pthread_start + 148

It is still very random, and with OMP_NUM_THREADS=1 I've never gotten it to crash.

@kou
Copy link
Member

kou commented Jun 1, 2023

Thanks. Then it seems that this isn't related to Ruby. This will be related to Acero.

@westonpace Have you seen this problem? (Joining with multiple threads is crashed.)

@stenlarsson
Copy link
Contributor Author

I refactored our code to call Arrow::Table#join for each table instead of creating an execution plan with multiple joins in it, and it is no longer crashing.

I noticed that in 1c97ab0 you have added a call to share_input. We are not doing this in our code, so it suggests that it really is an issue with garbage collection. Since we are joining multiple tables, share_input wouldn't really work. You normally don't have to deal with such things in Ruby. 😓

However I did manage to crash it with GC.disable, so I'm not sure what is going on. 🤔

@westonpace
Copy link
Member

@westonpace Have you seen this problem? (Joining with multiple threads is crashed.)

I'm not aware of any current issues like this. That being said, the join code is pretty complex, so I would not be surprised if it was the culprit. Getting some kind of reproducer will be essential though.

@stenlarsson , you mention that you have several joins. Are they chained together or parallel? For example, chained would be "join a and b and then join c to the result and then join d to the result" and parallel would be something like "join a and b and also join c and d and then join the two results together".

Also, what kinds of joins are these? Are they left inner joins?

@stenlarsson
Copy link
Contributor Author

They are chained left outer joins.

@kou
Copy link
Member

kou commented Jun 3, 2023

I noticed that in 1c97ab0 you have added a call to share_input. We are not doing this in our code, so it suggests that it really is an issue with garbage collection. Since we are joining multiple tables, share_input wouldn't really work. You normally don't have to deal with such things in Ruby. 😓

I can't understand the problem you said. Could you provide a small script that shows the problem?

@stenlarsson
Copy link
Contributor Author

Here is an example

require 'arrow'

def build_join(plan)
  left_array = Arrow::Int64DataType.new.build_array(0..10)
  left_table = Arrow::Table.new('col1' => left_array)
  left_node = plan.build_source_node(left_table)
  right_array = Arrow::Int64DataType.new.build_array(0..5)
  right_table = Arrow::Table.new('col2' => right_array)
  right_node = plan.build_source_node(right_table)
  plan.build_hash_join_node(left_node, right_node, Arrow::HashJoinNodeOptions.new(:left_outer, ['col1'], ['col2']))
end

def get_result(plan, node)
  sink_node_options = Arrow::SinkNodeOptions.new
  plan.build_sink_node(node, sink_node_options)
  plan.validate
  plan.start
  plan.wait
  reader = sink_node_options.get_reader(node.output_schema)
  reader.read_all
end

plan = Arrow::ExecutePlan.new
node = build_join(plan)
GC.start
result = get_result(plan, node)
p result

This prints an empty table on my computer (!?), but if I remove the line with GC.start it works. I said earlier that share_input wouldn't really work, but it would. It is just that code would need to be refactored in an awkward way to get access to left_table and right_table.

kou added a commit to kou/arrow that referenced this issue Jun 7, 2023
…ePlan

If we don't refer them, GC may free them unexpectedly.

Relations:

* `GArrowExecutePlan` -> `GArrowExecuteNode`s
* `GArrowExecuteNode` -> `GArrowExecuteOptions`
* `GArrowSourceNodeOptions` -> `GArrowRecordBatchReader` or `GArrowRecordBatch`
* `GArrowRecordBatchReader` -> `GArrowRecordBatch`s or `GArrowTable`
@kou
Copy link
Member

kou commented Jun 7, 2023

Thanks.
It's not related to share_input. This is not an use-case of share_input.

GH-35963 will fix this.

@stenlarsson
Copy link
Contributor Author

Great news, thanks!

kou added a commit to kou/arrow that referenced this issue Jun 8, 2023
…ePlan

If we don't refer them, GC may free them unexpectedly.

Relations:

* `GArrowExecutePlan` -> `GArrowExecuteNode`s
* `GArrowExecuteNode` -> `GArrowExecuteOptions`
* `GArrowSourceNodeOptions` -> `GArrowRecordBatchReader` or `GArrowRecordBatch`
* `GArrowRecordBatchReader` -> `GArrowRecordBatch`s or `GArrowTable`
@kou kou closed this as completed in #35963 Jun 9, 2023
kou added a commit that referenced this issue Jun 9, 2023
…35963)

### Rationale for this change

If we don't refer them, GC may free them unexpectedly.

Relations:

* `GArrowExecutePlan` -> `GArrowExecuteNode`s
* `GArrowExecuteNode` -> `GArrowExecuteOptions`
* `GArrowSourceNodeOptions` -> `GArrowRecordBatchReader` or `GArrowRecordBatch`
* `GArrowRecordBatchReader` -> `GArrowRecordBatch`s or `GArrowTable`

### What changes are included in this PR?

Add missing references in GLib and mark dependency container explicitly in Ruby.
Because we can't mark dependency container automatically in Ruby.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* Closes: #35819

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
@kou kou added this to the 13.0.0 milestone Jun 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants