Skip to content

Commit

Permalink
Merge pull request #2139 from elBoberido/iox-2137-merge-ListenerImpl-…
Browse files Browse the repository at this point in the history
…back-to-listener

iox-#2137 Merge 'ListenerImpl' back to 'Listener'
  • Loading branch information
elBoberido committed Dec 20, 2023
2 parents 154b5a5 + ecf8bf3 commit 53fce2d
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 148 deletions.
1 change: 1 addition & 0 deletions doc/website/release-notes/iceoryx-unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
- Add public functions to create an 'access_rights' object from integer values [#2108](https://github.com/eclipse-iceoryx/iceoryx/issues/2108)
- Fix `historyRequest` may be larger than `queueCapacity` during creating a subscriber [#2121](https://github.com/eclipse-iceoryx/iceoryx/issues/2121)
- Unable to acquire file status due to an unknown failure [#2023](https://github.com/eclipse-iceoryx/iceoryx/issues/2023)
- Bug in 'ListenerImpl' [#2137](https://github.com/eclipse-iceoryx/iceoryx/issues/2137)

**Refactoring:**

Expand Down
145 changes: 22 additions & 123 deletions iceoryx_posh/include/iceoryx_posh/internal/popo/listener.inl
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ namespace iox
{
namespace popo
{
template <uint64_t Capacity>
template <typename T, typename ContextDataType>
inline expected<void, ListenerError>
ListenerImpl<Capacity>::attachEvent(T& eventOrigin,
const NotificationCallback<T, ContextDataType>& eventCallback) noexcept
Listener::attachEvent(T& eventOrigin, const NotificationCallback<T, ContextDataType>& eventCallback) noexcept
{
if (eventCallback.m_callback == nullptr)
{
Expand All @@ -42,14 +40,12 @@ ListenerImpl<Capacity>::attachEvent(T& eventOrigin,
NotificationAttorney::getInvalidateTriggerMethod(eventOrigin))
.and_then([&](auto& eventId) {
NotificationAttorney::enableEvent(
eventOrigin,
TriggerHandle(*m_conditionVariableData, {*this, &ListenerImpl<Capacity>::removeTrigger}, eventId));
eventOrigin, TriggerHandle(*m_conditionVariableData, {*this, &Listener::removeTrigger}, eventId));
});
}

template <uint64_t Capacity>
template <typename T, typename EventType, typename ContextDataType, typename>
inline expected<void, ListenerError> ListenerImpl<Capacity>::attachEvent(
inline expected<void, ListenerError> Listener::attachEvent(
T& eventOrigin, const EventType eventType, const NotificationCallback<T, ContextDataType>& eventCallback) noexcept
{
if (eventCallback.m_callback == nullptr)
Expand All @@ -67,70 +63,23 @@ inline expected<void, ListenerError> ListenerImpl<Capacity>::attachEvent(
.and_then([&](auto& eventId) {
NotificationAttorney::enableEvent(
eventOrigin,
TriggerHandle(*m_conditionVariableData, {*this, &ListenerImpl<Capacity>::removeTrigger}, eventId),
TriggerHandle(*m_conditionVariableData, {*this, &Listener::removeTrigger}, eventId),
eventType);
});
}

template <uint64_t Capacity>
template <typename T, typename EventType, typename>
inline void ListenerImpl<Capacity>::detachEvent(T& eventOrigin, const EventType eventType) noexcept
{
static_assert(IS_EVENT_ENUM<EventType>,
"Only enums with an underlying EventEnumIdentifier can be attached/detached to the Listener");
NotificationAttorney::disableEvent(eventOrigin, eventType);
}

template <uint64_t Capacity>
template <typename T>
inline void ListenerImpl<Capacity>::detachEvent(T& eventOrigin) noexcept
{
NotificationAttorney::disableEvent(eventOrigin);
}

template <uint64_t Capacity>
inline constexpr uint64_t ListenerImpl<Capacity>::capacity() noexcept
{
return Capacity;
}

template <uint64_t Capacity>
inline ListenerImpl<Capacity>::ListenerImpl() noexcept
: ListenerImpl(*runtime::PoshRuntime::getInstance().getMiddlewareConditionVariable())
{
}

template <uint64_t Capacity>
inline ListenerImpl<Capacity>::ListenerImpl(ConditionVariableData& conditionVariable) noexcept
: m_conditionVariableData(&conditionVariable)
, m_conditionListener(conditionVariable)
{
m_thread = std::thread(&ListenerImpl<Capacity>::threadLoop, this);
}

template <uint64_t Capacity>
inline ListenerImpl<Capacity>::~ListenerImpl() noexcept
{
m_wasDtorCalled.store(true, std::memory_order_relaxed);
m_conditionListener.destroy();

m_thread.join();
m_conditionVariableData->m_toBeDestroyed.store(true, std::memory_order_relaxed);
}

template <uint64_t Capacity>
inline expected<uint32_t, ListenerError>
ListenerImpl<Capacity>::addEvent(void* const origin,
void* const userType,
const uint64_t eventType,
const uint64_t eventTypeHash,
internal::GenericCallbackRef_t callback,
internal::TranslationCallbackRef_t translationCallback,
const function<void(uint64_t)> invalidationCallback) noexcept
Listener::addEvent(void* const origin,
void* const userType,
const uint64_t eventType,
const uint64_t eventTypeHash,
internal::GenericCallbackRef_t callback,
internal::TranslationCallbackRef_t translationCallback,
const function<void(uint64_t)> invalidationCallback) noexcept
{
std::lock_guard<std::mutex> lock(m_addEventMutex);

for (uint32_t i = 0U; i < MAX_NUMBER_OF_EVENTS_PER_LISTENER; ++i)
for (uint32_t i = 0U; i < MAX_NUMBER_OF_EVENTS; ++i)
{
if (m_events[i]->isEqualTo(origin, eventType, eventTypeHash))
{
Expand All @@ -149,75 +98,25 @@ ListenerImpl<Capacity>::addEvent(void* const origin,
return ok(index);
}

template <uint64_t Capacity>
inline uint64_t ListenerImpl<Capacity>::size() const noexcept
{
return m_indexManager.indicesInUse();
}

template <uint64_t Capacity>
inline void ListenerImpl<Capacity>::threadLoop() noexcept
{
while (m_wasDtorCalled.load(std::memory_order_relaxed) == false)
{
auto activateNotificationIds = m_conditionListener.wait();

for (auto& id : activateNotificationIds)
{
m_events[id]->executeCallback();
}
}
}

template <uint64_t Capacity>
inline void ListenerImpl<Capacity>::removeTrigger(const uint64_t index) noexcept
{
if (index >= MAX_NUMBER_OF_EVENTS_PER_LISTENER)
{
return;
}

if (m_events[index]->reset())
{
m_indexManager.push(static_cast<uint32_t>(index));
}
}

///////////////////////
// BEGIN IndexManager_t
///////////////////////
template <uint64_t Capacity>
inline ListenerImpl<Capacity>::IndexManager_t::IndexManager_t() noexcept
template <typename T, typename EventType, typename>
inline void Listener::detachEvent(T& eventOrigin, const EventType eventType) noexcept
{
m_loffli.init(m_loffliStorage, MAX_NUMBER_OF_EVENTS_PER_LISTENER);
static_assert(IS_EVENT_ENUM<EventType>,
"Only enums with an underlying EventEnumIdentifier can be attached/detached to the Listener");
NotificationAttorney::disableEvent(eventOrigin, eventType);
}

template <uint64_t Capacity>
inline bool ListenerImpl<Capacity>::IndexManager_t::pop(uint32_t& value) noexcept
template <typename T>
inline void Listener::detachEvent(T& eventOrigin) noexcept
{
if (m_loffli.pop(value))
{
++m_indicesInUse;
return true;
}
return false;
NotificationAttorney::disableEvent(eventOrigin);
}

template <uint64_t Capacity>
inline void ListenerImpl<Capacity>::IndexManager_t::push(const uint32_t index) noexcept
inline constexpr uint64_t Listener::capacity() noexcept
{
IOX_EXPECTS(m_loffli.push(index));
--m_indicesInUse;
return MAX_NUMBER_OF_EVENTS;
}

template <uint64_t Capacity>
uint64_t ListenerImpl<Capacity>::IndexManager_t::indicesInUse() const noexcept
{
return m_indicesInUse.load(std::memory_order_relaxed);
}
/////////////////////
// END IndexManager_t
/////////////////////
} // namespace popo
} // namespace iox
#endif
32 changes: 11 additions & 21 deletions iceoryx_posh/include/iceoryx_posh/popo/listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,16 @@ enum class ListenerError
///
/// Best practice: Detach a specific event only from one specific thread and not
/// from multiple contexts.
template <uint64_t Capacity>
class ListenerImpl
class Listener
{
public:
ListenerImpl() noexcept;
ListenerImpl(const ListenerImpl&) = delete;
ListenerImpl(ListenerImpl&&) = delete;
~ListenerImpl() noexcept;
Listener() noexcept;
Listener(const Listener&) = delete;
Listener(Listener&&) = delete;
~Listener() noexcept;

ListenerImpl& operator=(const ListenerImpl&) = delete;
ListenerImpl& operator=(ListenerImpl&&) = delete;
Listener& operator=(const Listener&) = delete;
Listener& operator=(Listener&&) = delete;

/// @brief Attaches an event. Hereby the event is defined as a class T, the eventOrigin, an enum which further
/// defines the event inside the class and the corresponding callback which will be called when the event
Expand Down Expand Up @@ -168,7 +167,7 @@ class ListenerImpl
uint64_t size() const noexcept;

protected:
ListenerImpl(ConditionVariableData& conditionVariableData) noexcept;
Listener(ConditionVariableData& conditionVariableData) noexcept;

private:
class Event_t;
Expand All @@ -190,6 +189,7 @@ class ListenerImpl
PLACEHOLDER = 0
};

static constexpr uint32_t MAX_NUMBER_OF_EVENTS = MAX_NUMBER_OF_EVENTS_PER_LISTENER;

class IndexManager_t
{
Expand All @@ -200,31 +200,21 @@ class ListenerImpl
uint64_t indicesInUse() const noexcept;

using LoFFLi = concurrent::LoFFLi;
LoFFLi::Index_t m_loffliStorage[LoFFLi::requiredIndexMemorySize(Capacity) / sizeof(uint32_t)];
LoFFLi::Index_t m_loffliStorage[LoFFLi::requiredIndexMemorySize(MAX_NUMBER_OF_EVENTS) / sizeof(uint32_t)];
LoFFLi m_loffli;
std::atomic<uint64_t> m_indicesInUse{0U};
} m_indexManager;


std::thread m_thread;
concurrent::smart_lock<internal::Event_t, std::recursive_mutex> m_events[Capacity];
concurrent::smart_lock<internal::Event_t, std::recursive_mutex> m_events[MAX_NUMBER_OF_EVENTS];
std::mutex m_addEventMutex;

std::atomic_bool m_wasDtorCalled{false};
ConditionVariableData* m_conditionVariableData = nullptr;
ConditionListener m_conditionListener;
};

class Listener : public ListenerImpl<MAX_NUMBER_OF_EVENTS_PER_LISTENER>
{
public:
using Parent = ListenerImpl<MAX_NUMBER_OF_EVENTS_PER_LISTENER>;
Listener() noexcept;

protected:
Listener(ConditionVariableData& conditionVariableData) noexcept;
};

} // namespace popo
} // namespace iox

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ class NotificationAttorney
{
template <uint64_t>
friend class WaitSet;
template <uint64_t>
friend class ListenerImpl;
friend class Listener;

private:
template <typename T, typename... Targs>
Expand Down
79 changes: 77 additions & 2 deletions iceoryx_posh/source/popo/listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,89 @@ namespace iox
namespace popo
{
Listener::Listener() noexcept
: Listener(*runtime::PoshRuntime::getInstance().getMiddlewareConditionVariable())
{
}

Listener::Listener(ConditionVariableData& conditionVariableData) noexcept
: Parent(conditionVariableData)
Listener::Listener(ConditionVariableData& conditionVariable) noexcept
: m_conditionVariableData(&conditionVariable)
, m_conditionListener(conditionVariable)
{
m_thread = std::thread(&Listener::threadLoop, this);
}

Listener::~Listener() noexcept
{
m_wasDtorCalled.store(true, std::memory_order_relaxed);
m_conditionListener.destroy();

m_thread.join();
m_conditionVariableData->m_toBeDestroyed.store(true, std::memory_order_relaxed);
}

uint64_t Listener::size() const noexcept
{
return m_indexManager.indicesInUse();
}

void Listener::threadLoop() noexcept
{
while (m_wasDtorCalled.load(std::memory_order_relaxed) == false)
{
auto activateNotificationIds = m_conditionListener.wait();

for (auto& id : activateNotificationIds)
{
m_events[id]->executeCallback();
}
}
}

void Listener::removeTrigger(const uint64_t index) noexcept
{
if (index >= MAX_NUMBER_OF_EVENTS)
{
return;
}

if (m_events[index]->reset())
{
m_indexManager.push(static_cast<uint32_t>(index));
}
}

///////////////////////
// BEGIN IndexManager_t
///////////////////////
Listener::IndexManager_t::IndexManager_t() noexcept
{
m_loffli.init(m_loffliStorage, MAX_NUMBER_OF_EVENTS);
}

bool Listener::IndexManager_t::pop(uint32_t& value) noexcept
{
if (m_loffli.pop(value))
{
++m_indicesInUse;
return true;
}
return false;
}

void Listener::IndexManager_t::push(const uint32_t index) noexcept
{
IOX_EXPECTS(m_loffli.push(index));
--m_indicesInUse;
}

uint64_t Listener::IndexManager_t::indicesInUse() const noexcept
{
return m_indicesInUse.load(std::memory_order_relaxed);
}
/////////////////////
// END IndexManager_t
/////////////////////

namespace internal
{
Event_t::~Event_t() noexcept
Expand Down

0 comments on commit 53fce2d

Please sign in to comment.