Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(system_monitor): multithreading support for boost::process #1714

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ class NTPMonitor : public rclcpp::Node
* @brief function to execute chronyc
* @param [out] outOffset offset value of NTP time
* @param [out] out_tracking_map "chronyc tracking" output for diagnostic
* @return if error occurred, return error string
* @param [out] pipe2_err_str if pipe2 error occurred, return error string
* @return if chronyc error occurred, return error string
*/
std::string executeChronyc(
float & outOffset, std::map<std::string, std::string> & out_tracking_map);
float & outOffset, std::map<std::string, std::string> & out_tracking_map,
std::string & pipe2_err_str);

diagnostic_updater::Updater updater_; //!< @brief Updater class which advertises to /diagnostics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ class ProcessMonitor : public rclcpp::Node
memory_tasks_; //!< @brief list of diagnostics tasks for high memory procs
rclcpp::TimerBase::SharedPtr timer_; //!< @brief timer to execute top command
std::string top_output_; //!< @brief output from top command
bool is_top_error_; //!< @brief flag if an error occurs
bool is_top_error_; //!< @brief flag if an top error occurs
bool is_pipe2_error_; //!< @brief flag if an pipe2 error occurs
double elapsed_ms_; //!< @brief Execution time of top command
std::mutex mutex_; //!< @brief mutex for output from top command
rclcpp::CallbackGroup::SharedPtr timer_callback_group_; //!< @brief Callback Group
Expand Down
27 changes: 25 additions & 2 deletions system/system_monitor/src/cpu_monitor/cpu_monitor_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,31 @@ void CPUMonitorBase::checkUsage(diagnostic_updater::DiagnosticStatusWrapper & st
}

// Get CPU Usage
bp::ipstream is_out;
bp::ipstream is_err;

// boost::process create file descriptor without O_CLOEXEC required for multithreading.
// So create file descriptor with O_CLOEXEC and pass it to boost::process.
int out_fd[2];
if (pipe2(out_fd, O_CLOEXEC) != 0) {
stat.summary(DiagStatus::ERROR, "pipe2 error");
stat.add("pipe2", strerror(errno));
cpu_usage.all.status = CpuStatus::STALE;
publishCpuUsage(cpu_usage);
return;
}
bp::pipe out_pipe{out_fd[0], out_fd[1]};
bp::ipstream is_out{std::move(out_pipe)};

int err_fd[2];
if (pipe2(err_fd, O_CLOEXEC) != 0) {
stat.summary(DiagStatus::ERROR, "pipe2 error");
stat.add("pipe2", strerror(errno));
cpu_usage.all.status = CpuStatus::STALE;
publishCpuUsage(cpu_usage);
return;
}
bp::pipe err_pipe{err_fd[0], err_fd[1]};
bp::ipstream is_err{std::move(err_pipe)};

bp::child c("mpstat -P ALL 1 1 -o JSON", bp::std_out > is_out, bp::std_err > is_err);
c.wait();

Expand Down
47 changes: 43 additions & 4 deletions system/system_monitor/src/hdd_monitor/hdd_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,31 @@ void HDDMonitor::checkUsage(diagnostic_updater::DiagnosticStatusWrapper & stat)

for (auto itr = hdd_params_.begin(); itr != hdd_params_.end(); ++itr, ++hdd_index) {
// Get summary of disk space usage of ext4
bp::ipstream is_out;
bp::ipstream is_err;

// boost::process create file descriptor without O_CLOEXEC required for multithreading.
// So create file descriptor with O_CLOEXEC and pass it to boost::process.
int out_fd[2];
if (pipe2(out_fd, O_CLOEXEC) != 0) {
error_str = "pipe2 error";
stat.add(fmt::format("HDD {}: status", hdd_index), "pipe2 error");
stat.add(fmt::format("HDD {}: name", hdd_index), itr->first.c_str());
stat.add(fmt::format("HDD {}: pipe2", hdd_index), strerror(errno));
continue;
}
bp::pipe out_pipe{out_fd[0], out_fd[1]};
bp::ipstream is_out{std::move(out_pipe)};

int err_fd[2];
if (pipe2(err_fd, O_CLOEXEC) != 0) {
error_str = "pipe2 error";
stat.add(fmt::format("HDD {}: status", hdd_index), "pipe2 error");
stat.add(fmt::format("HDD {}: name", hdd_index), itr->first.c_str());
stat.add(fmt::format("HDD {}: pipe2", hdd_index), strerror(errno));
continue;
}
bp::pipe err_pipe{err_fd[0], err_fd[1]};
bp::ipstream is_err{std::move(err_pipe)};

// Invoke shell to use shell wildcard expansion
bp::child c(
"/bin/sh", "-c", fmt::format("df -Pm {}*", itr->first.c_str()), bp::std_out > is_out,
Expand Down Expand Up @@ -324,8 +347,24 @@ void HDDMonitor::getHDDParams()
std::string HDDMonitor::getDeviceFromMountPoint(const std::string & mount_point)
{
std::string ret;
bp::ipstream is_out;
bp::ipstream is_err;

// boost::process create file descriptor without O_CLOEXEC required for multithreading.
// So create file descriptor with O_CLOEXEC and pass it to boost::process.
int out_fd[2];
if (pipe2(out_fd, O_CLOEXEC) != 0) {
RCLCPP_ERROR(get_logger(), "Failed to execute pipe2. %s", strerror(errno));
return "";
}
bp::pipe out_pipe{out_fd[0], out_fd[1]};
bp::ipstream is_out{std::move(out_pipe)};

int err_fd[2];
if (pipe2(err_fd, O_CLOEXEC) != 0) {
RCLCPP_ERROR(get_logger(), "Failed to execute pipe2. %s", strerror(errno));
return "";
}
bp::pipe err_pipe{err_fd[0], err_fd[1]};
bp::ipstream is_err{std::move(err_pipe)};

bp::child c(
"/bin/sh", "-c", fmt::format("findmnt -n -o SOURCE {}", mount_point.c_str()),
Expand Down
23 changes: 21 additions & 2 deletions system/system_monitor/src/mem_monitor/mem_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,27 @@ void MemMonitor::checkUsage(diagnostic_updater::DiagnosticStatusWrapper & stat)
const auto t_start = SystemMonitorUtility::startMeasurement();

// Get total amount of free and used memory
bp::ipstream is_out;
bp::ipstream is_err;

// boost::process create file descriptor without O_CLOEXEC required for multithreading.
// So create file descriptor with O_CLOEXEC and pass it to boost::process.
int out_fd[2];
if (pipe2(out_fd, O_CLOEXEC) != 0) {
stat.summary(DiagStatus::ERROR, "pipe2 error");
stat.add("pipe2", strerror(errno));
return;
}
bp::pipe out_pipe{out_fd[0], out_fd[1]};
bp::ipstream is_out{std::move(out_pipe)};

int err_fd[2];
if (pipe2(err_fd, O_CLOEXEC) != 0) {
stat.summary(DiagStatus::ERROR, "pipe2 error");
stat.add("pipe2", strerror(errno));
return;
}
bp::pipe err_pipe{err_fd[0], err_fd[1]};
bp::ipstream is_err{std::move(err_pipe)};

bp::child c("free -tb", bp::std_out > is_out, bp::std_err > is_err);
c.wait();
if (c.exit_code() != 0) {
Expand Down
23 changes: 20 additions & 3 deletions system/system_monitor/src/ntp_monitor/ntp_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,15 @@ void NTPMonitor::checkOffset(diagnostic_updater::DiagnosticStatusWrapper & stat)
}

std::string error_str;
std::string pipe2_err_str;
float offset = 0.0f;
std::map<std::string, std::string> tracking_map;
error_str = executeChronyc(offset, tracking_map);
error_str = executeChronyc(offset, tracking_map, pipe2_err_str);
if (!pipe2_err_str.empty()) {
stat.summary(DiagStatus::ERROR, "pipe2 error");
stat.add("pipe2", pipe2_err_str);
return;
}
if (!error_str.empty()) {
stat.summary(DiagStatus::ERROR, "chronyc error");
stat.add("chronyc", error_str);
Expand All @@ -92,12 +98,23 @@ void NTPMonitor::checkOffset(diagnostic_updater::DiagnosticStatusWrapper & stat)
}

std::string NTPMonitor::executeChronyc(
float & out_offset, std::map<std::string, std::string> & out_tracking_map)
float & out_offset, std::map<std::string, std::string> & out_tracking_map,
std::string & pipe2_err_str)
{
std::string result;

// Tracking chrony status
bp::ipstream is_out;

// boost::process create file descriptor without O_CLOEXEC required for multithreading.
// So create file descriptor with O_CLOEXEC and pass it to boost::process.
int out_fd[2];
if (pipe2(out_fd, O_CLOEXEC) != 0) {
pipe2_err_str = std::string(strerror(errno));
return result;
}
bp::pipe out_pipe{out_fd[0], out_fd[1]};
bp::ipstream is_out{std::move(out_pipe)};

bp::child c("chronyc tracking", bp::std_out > is_out);
c.wait();
if (c.exit_code() != 0) {
Expand Down
Loading