Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple threads process dataframe #181

Closed
zhouqi1727 opened this issue May 25, 2022 · 8 comments
Closed

Multiple threads process dataframe #181

zhouqi1727 opened this issue May 25, 2022 · 8 comments

Comments

@zhouqi1727
Copy link

One thread produces data and one thread processes data. When only the production thread is running, the program can run for a long time, such as more than one day, but when the processing thread is running at the same time, an error will be reported in a few hours. The error is reported in append_ Column, the following is the code of the two functions:
producer:
while(true){ lock_guard<std::mutex> lk(mutex1); if(dataframe.get_index().size()>300){ try{ auto index = dataframe.get_index()[0]; auto endIndex = dataframe.get_index()[99]; dataframe.remove_data_by_idx<double>({index,endIndex}); }catch(const std::exception& e){ cout<<"dataframere move data error:"<<e.what()<<endl; } continue; } dataframe.append_index((time+1)*interval); dataframe.append_column("agg_price",agg_price); dataframe.append_column("agg_vol",agg_vol); dataframe.append_column("buy_price",buy_price); dataframe.append_column("sell_price",sell_price);
sales:
while(1){ if(dataframere .get_index().size()>100){ std::lock_guard<std::mutex> lck2(mutex2); new_dataframere = dataframere ; vec = get_feats(new_dataframere ,"b"); } usleep(10000); }
The two functions are executed in different threads.

@hosseinmoein
Copy link
Owner

You should read the multithreading section of the documentation and see the code samples it points you to.

DataFrame is not multithreaded safe. You need to provide a SpinLock and your own mutex, if you are using the same instance of the DataFrame in multiple threads.

Also, I noticed in the sales thread

if(dataframere.get_index().size()>100)

is not inside the mutex protection

@hosseinmoein
Copy link
Owner

BTW, what is the error?

@zhouqi1727
Copy link
Author

the error is Segmentation fault in dataframe.append_column,i have provide a SpenLock in thread ,thanks.

@zhouqi1727
Copy link
Author

Thread 6 "main" received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x7ffff585d700 (LWP 446988)]
0x00005555555a58d7 in __gnu_cxx::__enable_if<std::__is_scalar::__value, double*>::__type std::__fill_n_a<double*, unsigned long, double>(double*, unsigned long, double const&) ()
(gdb) bt
#0 0x00005555555a58d7 in __gnu_cxx::__enable_if<std::__is_scalar::__value, double*>::__type std::__fill_n_a<double*, unsigned long, double>(double*, unsigned long, double const&) ()
#1 0x0000555555596ab8 in double* std::fill_n<double*, unsigned long, double>(double*, unsigned long, double const&) ()
#2 0x000055555558c22b in double* std::__uninitialized_fill_n::uninit_fill_n<double*, unsigned long, double>(double*, unsigned long, double const&) ()
#3 0x000055555557e9c2 in double* std::uninitialized_fill_n<double*, unsigned long, double>(double*, unsigned long, double const&) ()
#4 0x00005555555724f1 in double* std::uninitialized_fill_n_a<double*, unsigned long, double, double>(double*, unsigned long, double const&, std::allocator&) ()
#5 0x00005555555b050d in std::vector<double, std::allocator >::M_fill_insert(gnu_cxx::normal_iterator<double*, std::vector<double, std::allocator > >, unsigned long, double const&)
()
#6 0x00005555555a55bf in std::vector<double, std::allocator >::resize(unsigned long, double const&) ()
#7 0x0000555555596628 in void hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::consistent_functor
::operator()<std::vector<double, std::allocator > >(std::vector<double, std::allocator >&) const ()
#8 0x000055555558bd76 in void hmdf::HeteroVector::change_impl_help
<hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::consistent_functor
&, double>(hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::consistent_functor&) ()
#9 0x000055555557ddf1 in void hmdf::HeteroVector::change_impl<hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::consistent_functor&, hmdf::HeteroVector::type_list, double>(hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::consistent_functor&, hmdf::HeteroVector::type_list) ()
#10 0x000055555557166d in void hmdf::HeteroVector::change<hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::consistent_functor&>(hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::consistent_functor&) ()
#11 0x000055555556a067 in void hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::make_consistent() ()
#12 0x0000555555566fa9 in void hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::remove_data_by_idx(hmdf::Index2D) ()

@zhouqi1727
Copy link
Author

i update my code like this:
`
mutex mutex1,mutex2;
SpinLock lock;
producer:
while(true){
lock_guardstd::mutex lk(mutex1);
dataframe.set_lock(&lock);
if(dataframe.get_index().size()>300){
auto index = dataframe.get_index()[0];
auto endIndex = dataframe.get_index()[99];
dataframe.remove_data_by_idx({index,endIndex});
dataframe.remove_lock();
continue;
}
dataframe.append_index((time+1)*interval);
dataframe.append_column("agg_price",agg_price);
dataframe.append_column("agg_vol",agg_vol);
dataframe.append_column("buy_price",buy_price);
dataframe.append_column("sell_price",sell_price);
dataframe.remove_lock();
}

sales:
while(1){
if(dataframere .get_index().size()>100){
std::lock_guardstd::mutex lck2(mutex2);
dataframe.set_lock(&lock);
new_dataframere = dataframere ;
vec = get_feats(new_dataframere ,"b");
dataframe.remove_lock();
}
usleep(10000);
}
`

but i have got this error:

Thread 7 "main" received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x7ffff505c700 (LWP 454182)]
0x00005555555af84b in std::__detail::_Hash_code_base<hmdf::HeteroVector const*, std::pair<hmdf::HeteroVector const* const, std::vector<double, std::allocator > >, std::__detail::_Select1st, std::hash<hmdf::HeteroVector const*>, std::__detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, false>::_M_bucket_index(std::__detail::_Hash_node<std::pair<hmdf::HeteroVector const* const, std::vector<double, std::allocator > >, false> const*, unsigned long) const ()
(gdb) bt
#0 0x00005555555af84b in std::__detail::_Hash_code_base<hmdf::HeteroVector const*, std::pair<hmdf::HeteroVector const* const, std::vector<double, std::allocator > >, std::__detail::_Select1st, std::hash<hmdf::HeteroVector const*>, std::__detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, false>::_M_bucket_index(std::__detail::_Hash_node<std::pair<hmdf::HeteroVector const* const, std::vector<double, std::allocator > >, false> const*, unsigned long) const ()
#1 0x00005555555a44f1 in std::_Hashtable<hmdf::HeteroVector const*, std::pair<hmdf::HeteroVector const* const, std::vector<double, std::allocator > >, std::allocator<std::pair<hmdf::HeteroVector const* const, std::vector<double, std::allocator > > >, std::__detail::_Select1st, std::equal_to<hmdf::HeteroVector const*>, std::hash<hmdf::HeteroVector const*>, std::__detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, std::__detail::_Prime_rehash_policy, std::__detail::_Hashtable_traits<false, false, true> >::_M_bucket_index(std::__detail::_Hash_node<std::pair<hmdf::HeteroVector const* const, std::vector<double, std::allocator > >, false>) const ()
#2 0x0000555555595793 in std::_Hashtable<hmdf::HeteroVector const
, std::pair<hmdf::HeteroVector const* const, std::vector<double, std::allocator > >, std::allocator<std::pair<hmdf::HeteroVector const* const, std::vector<double, std::allocator > > >, std::__detail::_Select1st, std::equal_to<hmdf::HeteroVector const*>, std::hash<hmdf::HeteroVector const*>, std::__detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, std::__detail::_Prime_rehash_policy, std::__detail::_Hashtable_traits<false, false, true> >::_M_find_before_node(unsigned long, hmdf::HeteroVector const* const&, unsigned long) const ()
#3 0x000055555558a7c0 in std::_Hashtable<hmdf::HeteroVector const*, std::pair<hmdf::HeteroVector const* const, std::vector<double, std::allocator > >, std::allocator<std::pair<hmdf::HeteroVector const* const, std::vector<double, std::allocator > > >, std::__detail::_Select1st, std::equal_to<hmdf::HeteroVector const*>, std::hash<hmdf::HeteroVector const*>, std::__detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, std::__detail::_Prime_rehash_policy, std::__detail::_Hashtable_traits<false, false, true> >::_M_find_node(unsigned long, hmdf::HeteroVector const* const&, unsigned long) const ()
#4 0x000055555557c96d in std::_Hashtable<hmdf::HeteroVector const*, std::pair<hmdf::HeteroVector const* const, std::vector<double, std::allocator > >, std::allocator<std::pair<hmdf::HeteroVector const* const, std::vector<double, std::allocator > > >, std::__detail::_Select1st, std::equal_to<hmdf::HeteroVector const*>, std::hash<hmdf::HeteroVector const*>, std::__detail::_Mod_range_hashing, std::__detail::_Default_ranged_hash, std::__detail::_Prime_rehash_policy, std::__detail::_Hashtable_traits<false, false, true> >::find(hmdf::HeteroVector const* const&) ()
#5 0x0000555555570a65 in std::unordered_map<hmdf::HeteroVector const*, std::vector<double, std::allocator >, std::hash<hmdf::HeteroVector const*>, std::equal_to<hmdf::HeteroVector const*>, std::allocator<std::pair<hmdf::HeteroVector const* const, std::vector<double, std::allocator > > > >::find(hmdf::HeteroVector const* const&) ()
#6 0x0000555555569703 in std::vector<double, std::allocator >& hmdf::HeteroVector::get_vector() ()
#7 0x00005555555666c8 in hmdf::type_declare<hmdf::HeteroVector, double>::type& hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::get_column(char const*) ()
#8 0x0000555555566ef1 in unsigned long hmdf::DataFrame<unsigned long, hmdf::HeteroVector>::append_column(char const*, double const&, hmdf::nan_policy) ()

@hosseinmoein
Copy link
Owner

hosseinmoein commented May 26, 2022

I really don't know the logic of your program, so I am guessing here.
If you only have one instance of DataFrame in your entire process and you are protecting it with your own mutex, then you do not need a SpinLock.

Again, I don't know your logic but from what I can tell, you want something like this:

std::mutex   mutex1;

// producer:
while (true)  {
    std::lock_guard<std::mutex> lk (mutex1);

    if (dataframe.get_index().size() > 300)  {
        auto index = dataframe.get_index()[0];
        auto endIndex = dataframe.get_index()[99];

        dataframe.remove_data_by_idx({ index, endIndex });
        continue;
    }
    dataframe.append_index ((time + 1) * interval);
    dataframe.append_column ("agg_price",agg_price);
    dataframe.append_column ("agg_vol",agg_vol);
    dataframe.append_column ("buy_price",buy_price);
    dataframe.append_column ("sell_price",sell_price);
}

// sales:
while (true)  {
    {
        std::lock_guard<std::mutex> lk (mutex1);

        if (dataframere.get_index().size() > 100)  {
            new_dataframere = dataframere ;
            vec = get_feats(new_dataframere ,"b");
        } 
    }

    usleep(10000);
}

@zhouqi1727
Copy link
Author

I've run the scheme you gave me before, but this scheme is not available to get_feats will not run, that is, it will not enter the sales process and will always be locked in the producer.

@hosseinmoein
Copy link
Owner

As I said I don’t know the logic of your process. So you have to adjust the code I gave you. Probably you should also sleep in the producer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants