Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Occasional TSAN failure on asof-join-node-test #35868

Closed
pitrou opened this issue Jun 1, 2023 · 1 comment · Fixed by #35904
Closed

[C++] Occasional TSAN failure on asof-join-node-test #35868

pitrou opened this issue Jun 1, 2023 · 1 comment · Fixed by #35904

Comments

@pitrou
Copy link
Member

pitrou commented Jun 1, 2023

Describe the bug, including details regarding any error messages, version, and platform.

Example here:
https://github.com/ursacomputing/crossbow/actions/runs/5142492846/jobs/9256298904#step:6:5567

Snippet in case the logs disappear:


[ RUN      ] AsofJoinNodeTest/AsofJoinBasicTest.TestBasic4Backward/2
==================
WARNING: ThreadSanitizer: data race (pid=13514)
  Write of size 8 at 0x7b2c000021f0 by main thread:
    #0 arrow::acero::KeyHasher::Invalidate() /arrow/cpp/src/arrow/acero/asof_join_node.cc:499:12 (libarrow_acero.so.1300+0x3f4289)
    #1 arrow::acero::InputState::Push(std::shared_ptr<arrow::RecordBatch> const&) /arrow/cpp/src/arrow/acero/asof_join_node.cc:901:20 (libarrow_acero.so.1300+0x3f1d06)
    #2 arrow::acero::AsofJoinNode::InputReceived(arrow::acero::ExecNode*, arrow::compute::ExecBatch) /arrow/cpp/src/arrow/acero/asof_join_node.cc:1692:5 (libarrow_acero.so.1300+0x3ca52c)
    #3 arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()::operator()() const /arrow/cpp/src/arrow/acero/source_node.cc:157:13 (libarrow_acero.so.1300+0x5c8d32)
    #4 std::_Function_handler<arrow::Status (), arrow::acero::(anonymous namespace)::SourceNode::SliceAndDeliverMorsel(arrow::compute::ExecBatch const&)::'lambda'()>::_M_invoke(std::_Any_data const&) /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:285:9 (libarrow_acero.so.1300+0x5c8531)
    #5 std::function<arrow::Status ()>::operator()() const /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:688:14 (libarrow_acero.so.1300+0x348f9d)
    #6 std::enable_if<((!(std::is_void<arrow::Status>::value)) && (!(is_future<arrow::Status>::value))) && ((!(arrow::Future<arrow::internal::Empty>::is_empty)) || (std::is_same<arrow::Status, arrow::Status>::value)), void>::type arrow::detail::ContinueFuture::operator()<std::function<arrow::Status ()>&, arrow::Status, arrow::Future<arrow::internal::Empty> >(arrow::Future<arrow::internal::Empty>, std::function<arrow::Status ()>&) const /arrow/cpp/src/arrow/util/future.h:150:23 (libarrow_acero.so.1300+0x5a69b4)
    #7 void std::__invoke_impl<void, arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status ()>&>(std::__invoke_other, arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status ()>&) /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (libarrow_acero.so.1300+0x5a68a9)
    #8 std::__invoke_result<arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status ()>&>::type std::__invoke<arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status ()>&>(arrow::detail::ContinueFuture&, arrow::Future<arrow::internal::Empty>&, std::function<arrow::Status ()>&) /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (libarrow_acero.so.1300+0x5a66ca)
    #9 void std::_Bind<arrow::detail::ContinueFuture (arrow::Future<arrow::internal::Empty>, std::function<arrow::Status ()>)>::__call<void, 0ul, 1ul>(std::tuple<>&&, std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:400:11 (libarrow_acero.so.1300+0x5a6623)
    #10 void std::_Bind<arrow::detail::ContinueFuture (arrow::Future<arrow::internal::Empty>, std::function<arrow::Status ()>)>::operator()<void>() /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/functional:482:17 (libarrow_acero.so.1300+0x5a6579)
    #11 arrow::internal::FnOnce<void ()>::FnImpl<std::_Bind<arrow::detail::ContinueFuture (arrow::Future<arrow::internal::Empty>, std::function<arrow::Status ()>)> >::invoke() /arrow/cpp/src/arrow/util/functional.h:152:42 (libarrow_acero.so.1300+0x5a6517)
    #12 arrow::internal::FnOnce<void ()>::operator()() && /arrow/cpp/src/arrow/util/functional.h:140:17 (libarrow.so.1300+0x1c6a250)
    #13 arrow::internal::SerialExecutor::SpawnReal(arrow::internal::TaskHints, arrow::internal::FnOnce<void ()>, arrow::StopToken, arrow::internal::FnOnce<void (arrow::Status const&)>&&)::$_1::operator()() /arrow/cpp/src/arrow/util/thread_pool.cc:89:5 (libarrow.so.1300+0x1c64c96)
    #14 arrow::internal::FnOnce<void ()>::FnImpl<arrow::internal::SerialExecutor::SpawnReal(arrow::internal::TaskHints, arrow::internal::FnOnce<void ()>, arrow::StopToken, arrow::internal::FnOnce<void (arrow::Status const&)>&&)::$_1>::invoke() /arrow/cpp/src/arrow/util/functional.h:152:42 (libarrow.so.1300+0x1c64c17)
    #15 arrow::internal::FnOnce<void ()>::operator()() && /arrow/cpp/src/arrow/util/functional.h:140:17 (libarrow.so.1300+0x1c6a250)
    #16 arrow::internal::SerialExecutor::RunLoop() /arrow/cpp/src/arrow/util/thread_pool.cc:168:9 (libarrow.so.1300+0x1c5f32a)
    #17 arrow::Future<std::shared_ptr<arrow::Table> > arrow::internal::SerialExecutor::Run<std::shared_ptr<arrow::Table>, arrow::Result<std::shared_ptr<arrow::Table> > >(arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>) /arrow/cpp/src/arrow/util/thread_pool.h:393:5 (libarrow_acero.so.1300+0x4ba38c)
    #18 arrow::Result<std::shared_ptr<arrow::Table> > arrow::internal::SerialExecutor::RunInSerialExecutor<std::shared_ptr<arrow::Table>, arrow::Future<std::shared_ptr<arrow::Table> >, arrow::Result<std::shared_ptr<arrow::Table> > >(arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>) /arrow/cpp/src/arrow/util/thread_pool.h:290:38 (libarrow_acero.so.1300+0x4b9758)
    #19 arrow::Future<std::shared_ptr<arrow::Table> >::SyncType arrow::internal::RunSynchronously<arrow::Future<std::shared_ptr<arrow::Table> >, std::shared_ptr<arrow::Table> >(arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>, bool) /arrow/cpp/src/arrow/util/thread_pool.h:497:12 (libarrow_acero.so.1300+0x47eaa2)
    #20 arrow::acero::DeclarationToTable(arrow::acero::Declaration, bool, arrow::MemoryPool*, arrow::compute::FunctionRegistry*) /arrow/cpp/src/arrow/acero/exec_plan.cc:783:10 (libarrow_acero.so.1300+0x45cf02)
    #21 arrow::acero::CheckRunOutput(arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::AsofJoinNodeOptions, std::function<void (arrow::Table const&, arrow::Table const&)>) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:250:3 (arrow-acero-asof-join-node-test+0x1d8f30)
    #22 arrow::acero::BasicTest::CheckRunOutput(arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::AsofJoinNodeOptions) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:472:5 (arrow-acero-asof-join-node-test+0x26a14f)
    #23 arrow::acero::BasicTest::CheckRunOutput(arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::FieldRef, std::initializer_list<arrow::FieldRef>, long) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:458:3 (arrow-acero-asof-join-node-test+0x2730a2)
    #24 arrow::acero::BasicTest::RunMutateByKey()::'lambda'(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema)::operator()(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema) const /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:514:7 (arrow-acero-asof-join-node-test+0x27ada3)
    #25 void arrow::acero::BasicTest::RunTypes<arrow::acero::BasicTest::RunMutateByKey()::'lambda'(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema)>(arrow::acero::BasicTestTypes, arrow::acero::BasicTest::RunMutateByKey()::'lambda'(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema)) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:665:5 (arrow-acero-asof-join-node-test+0x27914b)
    #26 void arrow::acero::BasicTest::RunBatches<arrow::acero::BasicTest::RunMutateByKey()::'lambda'(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema)>(arrow::acero::BasicTest::RunMutateByKey()::'lambda'(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema)) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:627:7 (arrow-acero-asof-join-node-test+0x2741f7)
    #27 arrow::acero::BasicTest::RunMutateByKey() /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:508:5 (arrow-acero-asof-join-node-test+0x2731d3)
    #28 arrow::acero::BasicTest::DoMutateByKey(arrow::acero::BasicTest&) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:518:67 (arrow-acero-asof-join-node-test+0x25c8f8)
    #29 std::_Function_handler<void (arrow::acero::BasicTest&), void (*)(arrow::acero::BasicTest&)>::_M_invoke(std::_Any_data const&, arrow::acero::BasicTest&) /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:300:2 (arrow-acero-asof-join-node-test+0x26c153)
    #30 std::function<void (arrow::acero::BasicTest&)>::operator()(arrow::acero::BasicTest&) const /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:688:14 (arrow-acero-asof-join-node-test+0x2032d7)
    #31 arrow::acero::AsofJoinBasicTest_TestBasic4Backward_Test::TestBody() /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:852:1 (arrow-acero-asof-join-node-test+0x1e20e8)
    #32 void testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) /build/cpp/googletest_ep-prefix/src/googletest_ep/googletest/src/gtest.cc:2607:10 (libgtestd.so.1.11.0+0xc6ad3)

  Previous write of size 8 at 0x7b2c000021f0 by thread T19 (mutexes: write M687497493834895368):
    #0 arrow::acero::KeyHasher::Invalidate() /arrow/cpp/src/arrow/acero/asof_join_node.cc:499:12 (libarrow_acero.so.1300+0x3f4289)
    #1 arrow::acero::KeyHasher::HashesFor(arrow::RecordBatch const*) /arrow/cpp/src/arrow/acero/asof_join_node.cc:507:5 (libarrow_acero.so.1300+0x41affe)
    #2 arrow::acero::InputState::GetKey(arrow::RecordBatch const*, unsigned long) const /arrow/cpp/src/arrow/acero/asof_join_node.cc:750:27 (libarrow_acero.so.1300+0x419af8)
    #3 arrow::acero::InputState::GetLatestKey() const /arrow/cpp/src/arrow/acero/asof_join_node.cc:742:12 (libarrow_acero.so.1300+0x4144df)
    #4 arrow::acero::CompositeReferenceTable<64ul>::Emplace(std::vector<std::unique_ptr<arrow::acero::InputState, std::default_delete<arrow::acero::InputState> >, std::allocator<std::unique_ptr<arrow::acero::InputState, std::default_delete<arrow::acero::InputState> > > >&, arrow::acero::TolType) /arrow/cpp/src/arrow/acero/asof_join_node.cc:1012:25 (libarrow_acero.so.1300+0x40b3c8)
    #5 arrow::acero::AsofJoinNode::ProcessInner() /arrow/cpp/src/arrow/acero/asof_join_node.cc:1283:13 (libarrow_acero.so.1300+0x409c2e)
    #6 arrow::acero::AsofJoinNode::Process() /arrow/cpp/src/arrow/acero/asof_join_node.cc:1347:53 (libarrow_acero.so.1300+0x4055c9)
    #7 arrow::acero::AsofJoinNode::ProcessThread() /arrow/cpp/src/arrow/acero/asof_join_node.cc:1386:12 (libarrow_acero.so.1300+0x4052bf)
    #8 arrow::acero::AsofJoinNode::ProcessThreadWrapper(arrow::acero::AsofJoinNode*) /arrow/cpp/src/arrow/acero/asof_join_node.cc:1392:64 (libarrow_acero.so.1300+0x404978)
    #9 void std::__invoke_impl<void, void (*)(arrow::acero::AsofJoinNode*), arrow::acero::AsofJoinNode*>(std::__invoke_other, void (*&&)(arrow::acero::AsofJoinNode*), arrow::acero::AsofJoinNode*&&) /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:60:14 (libarrow_acero.so.1300+0x445b5d)
    #10 std::__invoke_result<void (*)(arrow::acero::AsofJoinNode*), arrow::acero::AsofJoinNode*>::type std::__invoke<void (*)(arrow::acero::AsofJoinNode*), arrow::acero::AsofJoinNode*>(void (*&&)(arrow::acero::AsofJoinNode*), arrow::acero::AsofJoinNode*&&) /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/invoke.h:95:14 (libarrow_acero.so.1300+0x4459dd)
    #11 void std::thread::_Invoker<std::tuple<void (*)(arrow::acero::AsofJoinNode*), arrow::acero::AsofJoinNode*> >::_M_invoke<0ul, 1ul>(std::_Index_tuple<0ul, 1ul>) /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/thread:244:13 (libarrow_acero.so.1300+0x445968)
    #12 std::thread::_Invoker<std::tuple<void (*)(arrow::acero::AsofJoinNode*), arrow::acero::AsofJoinNode*> >::operator()() /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/thread:251:11 (libarrow_acero.so.1300+0x4458e8)
    #13 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(arrow::acero::AsofJoinNode*), arrow::acero::AsofJoinNode*> > >::_M_run() /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/thread:195:13 (libarrow_acero.so.1300+0x44549f)
    #14 <null> <null> (libstdc++.so.6+0xd6de3)

  Location is heap block of size 168 at 0x7b2c000021b0 allocated by main thread:
    #0 operator new(unsigned long) <null> (arrow-acero-asof-join-node-test+0x1d1e27)
    #1 std::_MakeUniq<arrow::acero::KeyHasher>::__single_object std::make_unique<arrow::acero::KeyHasher, unsigned long&, std::vector<int, std::allocator<int> >&>(unsigned long&, std::vector<int, std::allocator<int> >&) /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/unique_ptr.h:857:30 (libarrow_acero.so.1300+0x3d086b)
    #2 arrow::acero::AsofJoinNode::Make(arrow::acero::ExecPlan*, std::vector<arrow::acero::ExecNode*, std::allocator<arrow::acero::ExecNode*> >, arrow::acero::ExecNodeOptions const&) /arrow/cpp/src/arrow/acero/asof_join_node.cc:1666:29 (libarrow_acero.so.1300+0x3c7b94)
    #3 std::_Function_handler<arrow::Result<arrow::acero::ExecNode*> (arrow::acero::ExecPlan*, std::vector<arrow::acero::ExecNode*, std::allocator<arrow::acero::ExecNode*> >, arrow::acero::ExecNodeOptions const&), arrow::Result<arrow::acero::ExecNode*> (*)(arrow::acero::ExecPlan*, std::vector<arrow::acero::ExecNode*, std::allocator<arrow::acero::ExecNode*> >, arrow::acero::ExecNodeOptions const&)>::_M_invoke(std::_Any_data const&, arrow::acero::ExecPlan*&&, std::vector<arrow::acero::ExecNode*, std::allocator<arrow::acero::ExecNode*> >&&, arrow::acero::ExecNodeOptions const&) /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:285:9 (arrow-acero-asof-join-node-test+0x3214fa)
    #4 std::function<arrow::Result<arrow::acero::ExecNode*> (arrow::acero::ExecPlan*, std::vector<arrow::acero::ExecNode*, std::allocator<arrow::acero::ExecNode*> >, arrow::acero::ExecNodeOptions const&)>::operator()(arrow::acero::ExecPlan*, std::vector<arrow::acero::ExecNode*, std::allocator<arrow::acero::ExecNode*> >, arrow::acero::ExecNodeOptions const&) const /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:688:14 (libarrow_acero.so.1300+0x49378f)
    #5 arrow::acero::MakeExecNode(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, arrow::acero::ExecPlan*, std::vector<arrow::acero::ExecNode*, std::allocator<arrow::acero::ExecNode*> >, arrow::acero::ExecNodeOptions const&, arrow::acero::ExecFactoryRegistry*) /arrow/cpp/src/arrow/acero/exec_plan.h:381:10 (libarrow_acero.so.1300+0x47dde3)
    #6 arrow::acero::Declaration::AddToPlan(arrow::acero::ExecPlan*, arrow::acero::ExecFactoryRegistry*) const /arrow/cpp/src/arrow/acero/exec_plan.cc:585:3 (libarrow_acero.so.1300+0x45a0e5)
    #7 arrow::acero::Declaration::AddToPlan(arrow::acero::ExecPlan*, arrow::acero::ExecFactoryRegistry*) const /arrow/cpp/src/arrow/acero/exec_plan.cc:581:5 (libarrow_acero.so.1300+0x459f35)
    #8 arrow::acero::(anonymous namespace)::DeclarationToTableImpl(arrow::acero::Declaration, arrow::acero::QueryOptions, arrow::internal::Executor*) /arrow/cpp/src/arrow/acero/exec_plan.cc:660:3 (libarrow_acero.so.1300+0x45bf1d)
    #9 arrow::acero::DeclarationToTable(arrow::acero::Declaration, bool, arrow::MemoryPool*, arrow::compute::FunctionRegistry*)::$_2::operator()(arrow::internal::Executor*) const /arrow/cpp/src/arrow/acero/exec_plan.cc:785:16 (libarrow_acero.so.1300+0x46fc39)
    #10 arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>::FnImpl<arrow::acero::DeclarationToTable(arrow::acero::Declaration, bool, arrow::MemoryPool*, arrow::compute::FunctionRegistry*)::$_2>::invoke(arrow::internal::Executor*&&) /arrow/cpp/src/arrow/util/functional.h:152:42 (libarrow_acero.so.1300+0x46fb1b)
    #11 arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>::operator()(arrow::internal::Executor*) && /arrow/cpp/src/arrow/util/functional.h:140:17 (libarrow_acero.so.1300+0x4b9628)
    #12 arrow::Future<std::shared_ptr<arrow::Table> > arrow::internal::SerialExecutor::Run<std::shared_ptr<arrow::Table>, arrow::Result<std::shared_ptr<arrow::Table> > >(arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>) /arrow/cpp/src/arrow/util/thread_pool.h:391:22 (libarrow_acero.so.1300+0x4ba33e)
    #13 arrow::Result<std::shared_ptr<arrow::Table> > arrow::internal::SerialExecutor::RunInSerialExecutor<std::shared_ptr<arrow::Table>, arrow::Future<std::shared_ptr<arrow::Table> >, arrow::Result<std::shared_ptr<arrow::Table> > >(arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>) /arrow/cpp/src/arrow/util/thread_pool.h:290:38 (libarrow_acero.so.1300+0x4b9758)
    #14 arrow::Future<std::shared_ptr<arrow::Table> >::SyncType arrow::internal::RunSynchronously<arrow::Future<std::shared_ptr<arrow::Table> >, std::shared_ptr<arrow::Table> >(arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>, bool) /arrow/cpp/src/arrow/util/thread_pool.h:497:12 (libarrow_acero.so.1300+0x47eaa2)
    #15 arrow::acero::DeclarationToTable(arrow::acero::Declaration, bool, arrow::MemoryPool*, arrow::compute::FunctionRegistry*) /arrow/cpp/src/arrow/acero/exec_plan.cc:783:10 (libarrow_acero.so.1300+0x45cf02)
    #16 arrow::acero::CheckRunOutput(arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::AsofJoinNodeOptions, std::function<void (arrow::Table const&, arrow::Table const&)>) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:250:3 (arrow-acero-asof-join-node-test+0x1d8f30)
    #17 arrow::acero::BasicTest::CheckRunOutput(arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::AsofJoinNodeOptions) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:472:5 (arrow-acero-asof-join-node-test+0x26a14f)
    #18 arrow::acero::BasicTest::CheckRunOutput(arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::FieldRef, std::initializer_list<arrow::FieldRef>, long) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:458:3 (arrow-acero-asof-join-node-test+0x2730a2)
    #19 arrow::acero::BasicTest::RunMutateByKey()::'lambda'(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema)::operator()(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema) const /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:514:7 (arrow-acero-asof-join-node-test+0x27ada3)
    #20 void arrow::acero::BasicTest::RunTypes<arrow::acero::BasicTest::RunMutateByKey()::'lambda'(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema)>(arrow::acero::BasicTestTypes, arrow::acero::BasicTest::RunMutateByKey()::'lambda'(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema)) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:665:5 (arrow-acero-asof-join-node-test+0x27914b)
    #21 void arrow::acero::BasicTest::RunBatches<arrow::acero::BasicTest::RunMutateByKey()::'lambda'(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema)>(arrow::acero::BasicTest::RunMutateByKey()::'lambda'(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema)) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:627:7 (arrow-acero-asof-join-node-test+0x2741f7)
    #22 arrow::acero::BasicTest::RunMutateByKey() /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:508:5 (arrow-acero-asof-join-node-test+0x2731d3)
    #23 arrow::acero::BasicTest::DoMutateByKey(arrow::acero::BasicTest&) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:518:67 (arrow-acero-asof-join-node-test+0x25c8f8)
    #24 std::_Function_handler<void (arrow::acero::BasicTest&), void (*)(arrow::acero::BasicTest&)>::_M_invoke(std::_Any_data const&, arrow::acero::BasicTest&) /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:300:2 (arrow-acero-asof-join-node-test+0x26c153)
    #25 std::function<void (arrow::acero::BasicTest&)>::operator()(arrow::acero::BasicTest&) const /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:688:14 (arrow-acero-asof-join-node-test+0x2032d7)
    #26 arrow::acero::AsofJoinBasicTest_TestBasic4Backward_Test::TestBody() /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:852:1 (arrow-acero-asof-join-node-test+0x1e20e8)
    #27 void testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) /build/cpp/googletest_ep-prefix/src/googletest_ep/googletest/src/gtest.cc:2607:10 (libgtestd.so.1.11.0+0xc6ad3)

  Mutex M687497493834895368 is already destroyed.

  Thread T19 (tid=13687, running) created by main thread at:
    #0 pthread_create <null> (arrow-acero-asof-join-node-test+0x1435eb)
    #1 std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) <null> (libstdc++.so.6+0xd70a8)
    #2 arrow::acero::AsofJoinNode::StartProducing() /arrow/cpp/src/arrow/acero/asof_join_node.cc:1719:23 (libarrow_acero.so.1300+0x3cb379)
    #3 arrow::acero::(anonymous namespace)::ExecPlanImpl::StartProducing()::'lambda'(arrow::util::AsyncTaskScheduler*)::operator()(arrow::util::AsyncTaskScheduler*) const /arrow/cpp/src/arrow/acero/exec_plan.cc:175:24 (libarrow_acero.so.1300+0x4644c8)
    #4 arrow::internal::FnOnce<arrow::Status (arrow::util::AsyncTaskScheduler*)>::FnImpl<arrow::acero::(anonymous namespace)::ExecPlanImpl::StartProducing()::'lambda'(arrow::util::AsyncTaskScheduler*)>::invoke(arrow::util::AsyncTaskScheduler*&&) /arrow/cpp/src/arrow/util/functional.h:152:42 (libarrow_acero.so.1300+0x463bbb)
    #5 arrow::internal::FnOnce<arrow::Status (arrow::util::AsyncTaskScheduler*)>::operator()(arrow::util::AsyncTaskScheduler*) && /arrow/cpp/src/arrow/util/functional.h:140:17 (libarrow.so.1300+0x1ac07a8)
    #6 arrow::util::AsyncTaskScheduler::Make(arrow::internal::FnOnce<arrow::Status (arrow::util::AsyncTaskScheduler*)>, arrow::internal::FnOnce<void (arrow::Status const&)>, arrow::StopToken) /arrow/cpp/src/arrow/util/async_util.cc:461:28 (libarrow.so.1300+0x1aac1a2)
    #7 arrow::acero::(anonymous namespace)::ExecPlanImpl::StartProducing() /arrow/cpp/src/arrow/acero/exec_plan.cc:128:35 (libarrow_acero.so.1300+0x457e92)
    #8 arrow::acero::ExecPlan::StartProducing() /arrow/cpp/src/arrow/acero/exec_plan.cc:439:59 (libarrow_acero.so.1300+0x45799a)
    #9 arrow::acero::(anonymous namespace)::DeclarationToTableImpl(arrow::acero::Declaration, arrow::acero::QueryOptions, arrow::internal::Executor*) /arrow/cpp/src/arrow/acero/exec_plan.cc:662:14 (libarrow_acero.so.1300+0x45c359)
    #10 arrow::acero::DeclarationToTable(arrow::acero::Declaration, bool, arrow::MemoryPool*, arrow::compute::FunctionRegistry*)::$_2::operator()(arrow::internal::Executor*) const /arrow/cpp/src/arrow/acero/exec_plan.cc:785:16 (libarrow_acero.so.1300+0x46fc39)
    #11 arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>::FnImpl<arrow::acero::DeclarationToTable(arrow::acero::Declaration, bool, arrow::MemoryPool*, arrow::compute::FunctionRegistry*)::$_2>::invoke(arrow::internal::Executor*&&) /arrow/cpp/src/arrow/util/functional.h:152:42 (libarrow_acero.so.1300+0x46fb1b)
    #12 arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>::operator()(arrow::internal::Executor*) && /arrow/cpp/src/arrow/util/functional.h:140:17 (libarrow_acero.so.1300+0x4b9628)
    #13 arrow::Future<std::shared_ptr<arrow::Table> > arrow::internal::SerialExecutor::Run<std::shared_ptr<arrow::Table>, arrow::Result<std::shared_ptr<arrow::Table> > >(arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>) /arrow/cpp/src/arrow/util/thread_pool.h:391:22 (libarrow_acero.so.1300+0x4ba33e)
    #14 arrow::Result<std::shared_ptr<arrow::Table> > arrow::internal::SerialExecutor::RunInSerialExecutor<std::shared_ptr<arrow::Table>, arrow::Future<std::shared_ptr<arrow::Table> >, arrow::Result<std::shared_ptr<arrow::Table> > >(arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>) /arrow/cpp/src/arrow/util/thread_pool.h:290:38 (libarrow_acero.so.1300+0x4b9758)
    #15 arrow::Future<std::shared_ptr<arrow::Table> >::SyncType arrow::internal::RunSynchronously<arrow::Future<std::shared_ptr<arrow::Table> >, std::shared_ptr<arrow::Table> >(arrow::internal::FnOnce<arrow::Future<std::shared_ptr<arrow::Table> > (arrow::internal::Executor*)>, bool) /arrow/cpp/src/arrow/util/thread_pool.h:497:12 (libarrow_acero.so.1300+0x47eaa2)
    #16 arrow::acero::DeclarationToTable(arrow::acero::Declaration, bool, arrow::MemoryPool*, arrow::compute::FunctionRegistry*) /arrow/cpp/src/arrow/acero/exec_plan.cc:783:10 (libarrow_acero.so.1300+0x45cf02)
    #17 arrow::acero::CheckRunOutput(arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::AsofJoinNodeOptions, std::function<void (arrow::Table const&, arrow::Table const&)>) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:250:3 (arrow-acero-asof-join-node-test+0x1d8f30)
    #18 arrow::acero::BasicTest::CheckRunOutput(arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::AsofJoinNodeOptions) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:472:5 (arrow-acero-asof-join-node-test+0x26a14f)
    #19 arrow::acero::BasicTest::CheckRunOutput(arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::acero::BatchesWithSchema const&, arrow::FieldRef, std::initializer_list<arrow::FieldRef>, long) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:458:3 (arrow-acero-asof-join-node-test+0x2730a2)
    #20 arrow::acero::BasicTest::RunMutateByKey()::'lambda'(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema)::operator()(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema) const /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:514:7 (arrow-acero-asof-join-node-test+0x27ada3)
    #21 void arrow::acero::BasicTest::RunTypes<arrow::acero::BasicTest::RunMutateByKey()::'lambda'(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema)>(arrow::acero::BasicTestTypes, arrow::acero::BasicTest::RunMutateByKey()::'lambda'(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema)) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:665:5 (arrow-acero-asof-join-node-test+0x27914b)
    #22 void arrow::acero::BasicTest::RunBatches<arrow::acero::BasicTest::RunMutateByKey()::'lambda'(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema)>(arrow::acero::BasicTest::RunMutateByKey()::'lambda'(arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema, arrow::acero::BatchesWithSchema)) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:627:7 (arrow-acero-asof-join-node-test+0x2741f7)
    #23 arrow::acero::BasicTest::RunMutateByKey() /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:508:5 (arrow-acero-asof-join-node-test+0x2731d3)
    #24 arrow::acero::BasicTest::DoMutateByKey(arrow::acero::BasicTest&) /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:518:67 (arrow-acero-asof-join-node-test+0x25c8f8)
    #25 std::_Function_handler<void (arrow::acero::BasicTest&), void (*)(arrow::acero::BasicTest&)>::_M_invoke(std::_Any_data const&, arrow::acero::BasicTest&) /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:300:2 (arrow-acero-asof-join-node-test+0x26c153)
    #26 std::function<void (arrow::acero::BasicTest&)>::operator()(arrow::acero::BasicTest&) const /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/std_function.h:688:14 (arrow-acero-asof-join-node-test+0x2032d7)
    #27 arrow::acero::AsofJoinBasicTest_TestBasic4Backward_Test::TestBody() /arrow/cpp/src/arrow/acero/asof_join_node_test.cc:852:1 (arrow-acero-asof-join-node-test+0x1e20e8)
    #28 void testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) /build/cpp/googletest_ep-prefix/src/googletest_ep/googletest/src/gtest.cc:2607:10 (libgtestd.so.1.11.0+0xc6ad3)

SUMMARY: ThreadSanitizer: data race /arrow/cpp/src/arrow/acero/asof_join_node.cc:499:12 in arrow::acero::KeyHasher::Invalidate()
==================

Component(s)

C++

@pitrou
Copy link
Member Author

pitrou commented Jun 1, 2023

cc @rtpsw @icexelloss

@pitrou pitrou added this to the 13.0.0 milestone Jun 6, 2023
pitrou pushed a commit that referenced this issue Jun 6, 2023
### Rationale for this change

`AsofJoinNode` may run into a data race when invalidating the key hasher.

The key hasher queried from one thread but invalidated from another. This might be simplified so that the key hasher would only be used from one thread, but this is out of scope for this PR.

### What changes are included in this PR?

The invalidated member of the key hasher is made atomic.

### Are these changes tested?

Yes, by existing testing.

### Are there any user-facing changes?

No.
* Closes: #35868

Authored-by: Yaron Gvili <rtpsw@hotmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant