Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix "pure virtual method called" in kafka_source #34

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

vyskubov
Copy link

@vyskubov vyskubov commented Sep 1, 2021

Sometimes there happens a call to kafka_source_base::parse() which is pure virtual.

This patch moves initialization of kafka_source::_thread from kafka_source_base initializer list to a call kafka_source_base::start() when kafka_source::thread_f() is available.

@vyskubov vyskubov changed the title fix "pure virtual method called" in kafka_sink fix "pure virtual method called" in kafka_source Sep 1, 2021
@skarlsson
Copy link
Member

the "while(!_started)
std::this_thread::sleep_for(std::chrono::milliseconds(100)); " part in thread_f() is supposed to protect about this and I don't think the suggested patch changes anything (for better or worse) . When do you see this error?

@vyskubov
Copy link
Author

vyskubov commented Sep 2, 2021

The error "pure virtual method called" happens on 146 line.

And now I caught it on patched version of kspp.

So, it may be that we are calling a parse() method when the object is about to be destructed and kafka_source::parse() deleted from virtual table at that moment. And a call to kafka_source_base::parse() is performed.

In my program I create and destroy the topology in a loop. So constructors and destructors a called pretty frequently.

Here is a backtrace (frame 6):

#0  __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:50
#1  0x00007ffff76c7859 in __GI_abort () at abort.c:79
#2  0x00007ffff794f911 in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#3  0x00007ffff795b38c in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#4  0x00007ffff795b3f7 in std::terminate() () from /lib/x86_64-linux-gnu/libstdc++.so.6
#5  0x00007ffff795c155 in __cxa_pure_virtual () from /lib/x86_64-linux-gnu/libstdc++.so.6
#6  0x000055555558a19f in kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::thread_f (this=0x555555e38ac0) at /usr/include/kspp/sources/kafka_source.h:146
#7  0x000055555558fadb in std::__invoke_impl<void, void (kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::*)(), kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>*> (
    __f=@0x555555e39040: (void (kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::*)(class kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes> * const)) 0x555555589e4e <kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::thread_f()>, __t=@0x555555e39038: 0x555555e38ac0) at /usr/include/c++/9/bits/invoke.h:73
#8  0x000055555558f737 in std::__invoke<void (kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::*)(), kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>*> (
    __fn=@0x555555e39040: (void (kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::*)(class kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes> * const)) 0x555555589e4e <kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::thread_f()>) at /usr/include/c++/9/bits/invoke.h:95
#9  0x000055555558f093 in std::thread::_Invoker<std::tuple<void (kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::*)(), kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>*> >::_M_invoke<0ul, 1ul>
    (this=0x555555e39038) at /usr/include/c++/9/thread:244
#10 0x000055555558e49c in std::thread::_Invoker<std::tuple<void (kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::*)(), kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>*> >::operator() (
    this=0x555555e39038) at /usr/include/c++/9/thread:251
#11 0x000055555558ca6e in std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>::*)(), kspp::kafka_source_base<void, CustomStruct, void, kspp::binary_serdes>*> > >::_M_run (this=0x555555e39030) at /usr/include/c++/9/thread:195
#12 0x00007ffff7987de4 in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#13 0x00007ffff7e7a609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#14 0x00007ffff77c4293 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

wait for kafka_source_base::thread_f() to finish before closing and destroying kafka_source
@vyskubov vyskubov marked this pull request as draft September 2, 2021 10:02
@vyskubov
Copy link
Author

vyskubov commented Sep 6, 2021

That will do. Now segfault'd gone.

Should I create new PR without mistakes or you squash these commits?

Or you want to see more deeply why the error happens?

@vyskubov vyskubov marked this pull request as ready for review September 8, 2021 09:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants