Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iox #489 replace introspection threads with periodic task #490

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion iceoryx_binding_c/source/c_user_trigger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,3 @@ void iox_user_trigger_reset_trigger(iox_user_trigger_t const self)
{
self->resetTrigger();
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@
#include "iceoryx_posh/internal/roudi/roudi_process.hpp"
#include "iceoryx_posh/mepoo/mepoo_config.hpp"
#include "iceoryx_posh/roudi/introspection_types.hpp"
#include "iceoryx_utils/cxx/method_callback.hpp"
#include "iceoryx_utils/internal/concurrent/periodic_task.hpp"

#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <iostream>
#include <mutex>
#include <string>
#include <thread>

namespace iox
{
Expand All @@ -41,14 +37,6 @@ namespace roudi
template <typename MemoryManager, typename SegmentManager, typename PublisherPort>
class MemPoolIntrospection
{
private:
enum RunLevel
{
RUN,
WAIT,
TERMINATE
};

public:
/// @brief The constructor for the MemPoolIntrospection.
/// It starts a thread and set it into a wait condition.
Expand All @@ -68,54 +56,38 @@ class MemPoolIntrospection
MemPoolIntrospection(MemPoolIntrospection&&) = delete;
MemPoolIntrospection& operator=(MemPoolIntrospection&&) = delete;

/// @brief This function sets the thread into run mode, which periodically
/// sends snapshots of the mempool introspecton data to the client.
/// The send interval can be set by @ref setSnapshotInterval "setSnapshotInterval(...)".
void start() noexcept;

/// @brief This functions sets the thread into a wait state and
/// the transmission of the introspection data is stopped.
void wait() noexcept;
/// @brief This function starts the periodic transmission of snapshots of the mempool introspecton data.
/// The send interval can be set by @ref setSendInterval "setSendInterval(...)". By default it's 1 second.
void run() noexcept;

/// @brief This function stops the thread which sends the introspection data.
/// It is not possible to start the thread again.
void terminate() noexcept;
void stop() noexcept;

/// @brief This function configures the interval for the transmission of the
/// mempool introspection data.
///
/// @param snapshotInterval_ms is the interval time in milliseconds
void setSnapshotInterval(unsigned int snapshotInterval_ms) noexcept;
/// @param[in] interval duration between two send invocations
void setSendInterval(const units::Duration interval) noexcept;

protected:
MemoryManager* m_rouDiInternalMemoryManager{nullptr}; // mempool handler needs to outlive this class (!)
SegmentManager* m_segmentManager{nullptr};
PublisherPort m_publisherPort{nullptr};
void send() noexcept;

private:
std::chrono::milliseconds m_snapShotInterval{1000};

std::atomic<RunLevel> m_runLevel{WAIT};
std::condition_variable m_waitConditionVar;
std::mutex m_mutex;
std::thread m_thread;

void run() noexcept;
void waitForRunLevelChange() noexcept;
void wakeUp(RunLevel newLevel = RUN) noexcept;

// wait until start command, run until wait or terminate, restart from wait
// is possible terminate call leads to exit
void threadMain() noexcept;

static void prepareIntrospectionSample(MemPoolIntrospectionInfo& sample,
const posix::PosixGroup& readerGroup,
const posix::PosixGroup& writerGroup,
uint32_t id) noexcept;
void send() noexcept;

/// @brief copy data fro internal struct into interface struct
void copyMemPoolInfo(const MemoryManager& memoryManager, MemPoolInfoContainer& dest) noexcept;

private:
units::Duration m_sendInterval{units::Duration::seconds<unsigned long long int>(1)};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using namespace iox::units::duration_literals will enable you to use the user-defined literal operation 1_s Locally in a class it should not lead to evil namespace polution.

Reminds me of plan to clean up the using namespace in iceoryx_posh_types.hpp 😋

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's unfortunately not possible within a class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, you're right. One thing I saw when checking the coreguidlines. They explicitly exclude using namespace std::units::duration_literals, see. So we might leave them in headers but probably not in iceoryx_posh_types.hpp as this will increase lookup times.

concurrent::PeriodicTask<cxx::MethodCallback<void>> m_publishingTask{
concurrent::PeriodicTaskManualStart, "MemPoolIntr", *this, &MemPoolIntrospection::send};
};

/// @brief typedef for the templated mempool introspection class that is used by RouDi for the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,98 +22,48 @@ namespace iox
namespace roudi
{
template <typename MemoryManager, typename SegmentManager, typename PublisherPort>
MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::MemPoolIntrospection(
inline MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::MemPoolIntrospection(
MemoryManager& rouDiInternalMemoryManager, SegmentManager& segmentManager, PublisherPort&& publisherPort)
: m_rouDiInternalMemoryManager(&rouDiInternalMemoryManager)
, m_segmentManager(&segmentManager)
, m_publisherPort(std::move(publisherPort))
, m_thread(&MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::threadMain, this)
{
m_publisherPort.offer();

posix::setThreadName(m_thread.native_handle(), "MemPoolIntr");
}

template <typename MemoryManager, typename SegmentManager, typename PublisherPort>
MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::~MemPoolIntrospection()
inline MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::~MemPoolIntrospection()
{
stop();
m_publisherPort.stopOffer();
terminate();
if (m_thread.joinable())
{
m_thread.join();
}
}

template <typename MemoryManager, typename SegmentManager, typename PublisherPort>
void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::wakeUp(RunLevel newLevel) noexcept
{
std::unique_lock<std::mutex> lock(m_mutex);
m_runLevel.store(newLevel, std::memory_order_seq_cst);
m_waitConditionVar.notify_one();
}

template <typename MemoryManager, typename SegmentManager, typename PublisherPort>
void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::start() noexcept
inline void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::run() noexcept
{
wakeUp(RUN);
m_publishingTask.start(m_sendInterval);
}

template <typename MemoryManager, typename SegmentManager, typename PublisherPort>
void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::wait() noexcept
inline void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::stop() noexcept
{
wakeUp(WAIT);
}

template <typename MemoryManager, typename SegmentManager, typename PublisherPort>
void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::terminate() noexcept
{
wakeUp(TERMINATE);
}

template <typename MemoryManager, typename SegmentManager, typename PublisherPort>
void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::setSnapshotInterval(
unsigned int snapshotInterval_ms) noexcept
{
m_snapShotInterval = std::chrono::milliseconds(snapshotInterval_ms);
}

template <typename MemoryManager, typename SegmentManager, typename PublisherPort>
void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::run() noexcept
{
while (m_runLevel.load(std::memory_order_seq_cst) == RUN)
{
send();
// TODO: could use sleep_until to avoid drift but a small drift is
// not critical here
std::this_thread::sleep_for(m_snapShotInterval);
}
}

template <typename MemoryManager, typename SegmentManager, typename PublisherPort>
void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::waitForRunLevelChange() noexcept
{
std::unique_lock<std::mutex> lock(m_mutex);
while (m_runLevel.load(std::memory_order_seq_cst) == WAIT)
{
m_waitConditionVar.wait(lock);
}
m_publishingTask.stop();
}

// wait until start command, run until wait or terminate, restart from wait
// is possible terminate call leads to exit
template <typename MemoryManager, typename SegmentManager, typename PublisherPort>
void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::threadMain() noexcept
inline void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::setSendInterval(
const units::Duration interval) noexcept
{
while (m_runLevel.load(std::memory_order_seq_cst) != TERMINATE)
m_sendInterval = interval;
if (m_publishingTask.isActive())
{
waitForRunLevelChange();
run();
m_publishingTask.stop();
m_publishingTask.start(m_sendInterval);
}
}

template <typename MemoryManager, typename SegmentManager, typename PublisherPort>
void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::prepareIntrospectionSample(
inline void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::prepareIntrospectionSample(
MemPoolIntrospectionInfo& sample,
const posix::PosixGroup& readerGroup,
const posix::PosixGroup& writerGroup,
Expand All @@ -128,7 +78,7 @@ void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::prepare


template <typename MemoryManager, typename SegmentManager, typename PublisherPort>
void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::send() noexcept
inline void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::send() noexcept
{
if (m_publisherPort.hasSubscribers())
{
Expand Down Expand Up @@ -188,8 +138,9 @@ void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::send()

// copy data fro internal struct into interface struct
template <typename MemoryManager, typename SegmentManager, typename PublisherPort>
void MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::copyMemPoolInfo(
const MemoryManager& memoryManager, MemPoolInfoContainer& dest) noexcept
inline void
MemPoolIntrospection<MemoryManager, SegmentManager, PublisherPort>::copyMemPoolInfo(const MemoryManager& memoryManager,
MemPoolInfoContainer& dest) noexcept
{
auto numOfMemPools = memoryManager.getNumberOfMemPools();
dest = MemPoolInfoContainer(numOfMemPools, MemPoolInfo());
Expand Down
Loading