Skip to content

Commit

Permalink
Add async Database methods
Browse files Browse the repository at this point in the history
Signed-off-by: jparisu <javierparis@eprosima.com>
  • Loading branch information
jparisu committed Sep 22, 2022
1 parent 2a4c1d4 commit 5a5464e
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <ddsrouter_utils/collection/database/IWatchDataBase.hpp>
#include <ddsrouter_utils/thread/manager/IManager.hpp>
#include <ddsrouter_utils/thread/connector/SlotConnector.hpp>
#include <ddsrouter_utils/types/Atomicable.hpp>

namespace eprosima {
Expand All @@ -32,7 +33,8 @@ enum DataBaseActionKind
{
add = 0,
modify = 1,
remove = 2
remove = 2,
enum_size = 3,
};

template <typename Key, typename Value>
Expand Down Expand Up @@ -67,6 +69,12 @@ class StdWatchDataBase : public IWatchDataBase<Key, Value>
virtual void register_deletion_callback(
const CallbackType& callback) override;

virtual void async_add(Key key, Value value);

virtual void async_modify(Key key, Value value);

virtual void async_remove(Key key);

protected:

void call_callback_common_(
Expand All @@ -82,7 +90,15 @@ class StdWatchDataBase : public IWatchDataBase<Key, Value>

SharedAtomicable<std::map<Key, Value>> map_database_;

std::array<SharedAtomicable<std::vector<CallbackType>>, 3> callbacks_;
std::array<
SharedAtomicable<
std::vector<
thread::SlotConnector<Key, Value>>>,
DataBaseActionKind::enum_size> callbacks_;

thread::SlotConnector<Key, Value> add_slot_connector_;
thread::SlotConnector<Key, Value> modify_slot_connector_;
thread::SlotConnector<Key> remove_slot_connector_;

};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ template <typename Key, typename Value>
StdWatchDataBase<Key, Value>::StdWatchDataBase(
std::shared_ptr<thread::IManager> thread_manager)
: thread_manager_(thread_manager)
, add_slot_connector_(
thread::SlotConnector<Key, Value>(
thread_manager.get(),
[this](Key key, Value value){ this->add(key, value); }))
, modify_slot_connector_(
thread::SlotConnector<Key, Value>(
thread_manager.get(),
[this](Key key, Value value){ this->modify(key, value); }))
, remove_slot_connector_(
thread::SlotConnector<Key>(
thread_manager.get(),
[this](Key key){ this->remove(key); }))
{
}

Expand Down Expand Up @@ -126,16 +138,34 @@ void StdWatchDataBase<Key, Value>::register_deletion_callback(
register_callback_common_(callback, DataBaseActionKind::remove);
}

template <typename Key, typename Value>
void StdWatchDataBase<Key, Value>::async_add(Key key, Value value)
{
add_slot_connector_.execute(key, value);
}

template <typename Key, typename Value>
void StdWatchDataBase<Key, Value>::async_modify(Key key, Value value)
{
modify_slot_connector_.execute(key, value);
}

template <typename Key, typename Value>
void StdWatchDataBase<Key, Value>::async_remove(Key key)
{
remove_slot_connector_.execute(key);
}

template <typename Key, typename Value>
void StdWatchDataBase<Key, Value>::call_callback_common_(
const Key& key,
const Value& value,
DataBaseActionKind action_kind)
{
std::shared_lock<std::shared_timed_mutex> lock_db(callbacks_[action_kind]);
for (auto& callback : callbacks_[action_kind])
for (auto& slot : callbacks_[action_kind])
{
callback(key, value);
slot.execute(key, value);
}
}

Expand All @@ -145,7 +175,10 @@ void StdWatchDataBase<Key, Value>::register_callback_common_(
DataBaseActionKind action_kind)
{
std::unique_lock<std::shared_timed_mutex> lock_db(callbacks_[action_kind]);
callbacks_[action_kind].push_back(callback);
callbacks_[action_kind].push_back(
thread::SlotConnector<Key, Value>(
thread_manager_.get(),
callback));
}

} /* namespace utils */
Expand Down

0 comments on commit 5a5464e

Please sign in to comment.