Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[32] Allow modules to show warnings #203

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 15 additions & 1 deletion crates/pulsar-core/src/pdk/daemon.rs
@@ -1,3 +1,5 @@
use std::fmt;

use semver::Version;
use serde::{Deserialize, Serialize};
use thiserror::Error;
Expand Down Expand Up @@ -150,11 +152,23 @@ impl PulsarDaemonHandle {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ModuleStatus {
Created,
Running,
Running(Vec<String>),
Failed(String),
Stopped,
}

impl fmt::Display for ModuleStatus {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
ModuleStatus::Running(warnings) if !warnings.is_empty() => {
write!(f, "Running([\"{}\"])", warnings.join("\",\""))
}
ModuleStatus::Running(_) => write!(f, "Running"),
_ => write!(f, "{:?}", self),
}
}
}

/// Messages used for internal communication between [`PulsarDaemonHandle`] and the underlying PulsarDaemon actor.
pub enum PulsarDaemonCommand {
ModulesList {
Expand Down
4 changes: 2 additions & 2 deletions crates/pulsar-core/src/pdk/mod.rs
Expand Up @@ -14,7 +14,7 @@
//! The [`ModuleContext`] is the entrypoint to access all the functions available to the module. It provides instances of:
//! - [`ModuleSender`] to send events
//! - [`ModuleReceiver`] to receive events
//! - [`ErrorSender`] to raise unrecoverable errors
//! - [`ModuleSignal`] to send signals, ex. raise unrecoverable errors, add warnings
//! - [`tokio::sync::watch::Receiver`] to get the configuration
//!
//! Check specific structs for more informations.
Expand All @@ -26,7 +26,7 @@
//! ```
//! use pulsar_core::pdk::{
//! ModuleContext, Payload, PulsarModule, PulsarModuleTask, Version, CleanExit,
//! ShutdownSignal, ModuleError,
//! ShutdownSignal, ModuleError, ModuleSignal,
//! };
//! use tokio::time::{sleep, Duration};
//!
Expand Down
18 changes: 15 additions & 3 deletions crates/pulsar-core/src/pdk/module.rs
Expand Up @@ -149,16 +149,21 @@ pub struct ModuleSender {
pub(crate) tx: Bus,
pub(crate) module_name: ModuleName,
pub(crate) process_tracker: ProcessTrackerHandle,
pub(crate) error_sender: ErrorSender,
pub(crate) signal_sender: SignalSender,
}

/// Raises unrecoverable errors from the module to the upper layer.
///
/// Sending an error leads to a graceful shutdown of the module after [issue #7](https://github.com/Exein-io/pulsar/issues/7)
/// will be closed.
pub type ErrorSender = mpsc::Sender<ModuleError>;
pub type SignalSender = mpsc::Sender<ModuleSignal>;
pub type ModuleError = Box<dyn std::error::Error + Send + Sync + 'static>;

pub enum ModuleSignal {
Warning(String),
Error(ModuleError),
}

impl ModuleSender {
/// Send an event to the [`Bus`].
pub fn send(&self, process: Pid, timestamp: Timestamp, payload: Payload) {
Expand Down Expand Up @@ -261,7 +266,14 @@ impl ModuleSender {
// We don't want to make raise_error async, so we use try_send and ignore
// the error. Since errors are fatal, it's not a problem to lose one when
// the buffer is full.
let _ = self.error_sender.try_send(err);
let _ = self.signal_sender.try_send(ModuleSignal::Error(err));
}

pub async fn raise_warning(&self, warning: String) {
let _ = self
.signal_sender
.send(ModuleSignal::Warning(warning))
.await;
}
}

Expand Down
10 changes: 5 additions & 5 deletions crates/pulsar-core/src/pdk/module_context.rs
Expand Up @@ -6,7 +6,7 @@ use tokio::sync::{broadcast, watch};

use crate::{
bus::Bus,
pdk::{ErrorSender, ModuleConfig, ModuleReceiver, ModuleSender, PulsarDaemonHandle},
pdk::{ModuleConfig, ModuleReceiver, ModuleSender, PulsarDaemonHandle, SignalSender},
};

use super::{process_tracker::ProcessTrackerHandle, ConfigError, ModuleError, ModuleName};
Expand All @@ -17,7 +17,7 @@ pub struct ModuleContext {
module_name: ModuleName,
cfg: watch::Receiver<ModuleConfig>,
bus: Bus,
error_sender: ErrorSender,
signal_sender: SignalSender,
daemon_handle: PulsarDaemonHandle,
process_tracker: ProcessTrackerHandle,
bpf_context: BpfContext,
Expand All @@ -29,7 +29,7 @@ impl ModuleContext {
cfg: watch::Receiver<ModuleConfig>,
bus: Bus,
module_name: ModuleName,
error_sender: ErrorSender,
signal_sender: SignalSender,
daemon_handle: PulsarDaemonHandle,
process_tracker: ProcessTrackerHandle,
bpf_context: BpfContext,
Expand All @@ -38,7 +38,7 @@ impl ModuleContext {
cfg,
bus,
module_name,
error_sender,
signal_sender,
daemon_handle,
process_tracker,
bpf_context,
Expand Down Expand Up @@ -90,7 +90,7 @@ impl ModuleContext {
tx: self.bus.get_sender(),
module_name: self.module_name.to_owned(),
process_tracker: self.process_tracker.clone(),
error_sender: self.error_sender.clone(),
signal_sender: self.signal_sender.clone(),
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/pulsar/term_print.rs
Expand Up @@ -36,12 +36,13 @@ impl TermPrintable for Vec<ModuleOverview> {
for module in sorted {
let status_color = match module.status {
ModuleStatus::Created => Color::White,
ModuleStatus::Running => Color::Green,
ModuleStatus::Running(ref warnings) if warnings.is_empty() => Color::Green,
ModuleStatus::Running(_) => Color::Yellow,
ModuleStatus::Failed(_) => Color::Red,
ModuleStatus::Stopped => Color::Yellow,
};

let status = format!("{:?}", module.status);
let status = format!("{}", module.status);

table.add_row(vec![
Cell::new(module.name)
Expand Down
41 changes: 25 additions & 16 deletions src/pulsard/module_manager.rs
Expand Up @@ -4,8 +4,8 @@ use bpf_common::program::BpfContext;
use pulsar_core::bus::Bus;
use pulsar_core::pdk::process_tracker::ProcessTrackerHandle;
use pulsar_core::pdk::{
ModuleConfig, ModuleContext, ModuleError, ModuleStatus, PulsarDaemonHandle, PulsarModuleTask,
ShutdownSender, ShutdownSignal, TaskLauncher,
ModuleConfig, ModuleContext, ModuleError, ModuleSignal, ModuleStatus, PulsarDaemonHandle,
PulsarModuleTask, ShutdownSender, ShutdownSignal, TaskLauncher,
};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::task::JoinHandle;
Expand All @@ -29,8 +29,8 @@ enum ModuleManagerCommand {
///
/// Once started, the running module will be managed through its [`PulsarModuleTask`] implementation.
pub struct ModuleManager {
tx_err: mpsc::Sender<ModuleError>,
rx_err: mpsc::Receiver<ModuleError>,
tx_sig: mpsc::Sender<ModuleSignal>,
rx_sig: mpsc::Receiver<ModuleSignal>,
rx_cmd: mpsc::Receiver<ModuleManagerCommand>,
daemon_handle: PulsarDaemonHandle,
process_tracker: ProcessTrackerHandle,
Expand All @@ -53,10 +53,10 @@ impl ModuleManager {
process_tracker: ProcessTrackerHandle,
bpf_context: BpfContext,
) -> Self {
let (tx_err, rx_err) = mpsc::channel(8);
let (tx_sig, rx_sig) = mpsc::channel(8);
Self {
tx_err,
rx_err,
tx_sig,
rx_sig,
rx_cmd,
task_launcher,
bus,
Expand All @@ -73,7 +73,7 @@ impl ModuleManager {
///
/// It will stop the module through [`PulsarModuleTask::stop`] method.
async fn handle_module_error(&mut self, err: ModuleError) {
if let ModuleStatus::Running = self.status {
if let ModuleStatus::Running(_) = self.status {
let (tx_shutdown, task) = self.running_task.take().unwrap();
tx_shutdown.send_signal();
let result = task.await;
Expand Down Expand Up @@ -117,7 +117,7 @@ impl ModuleManager {
match cmd {
ModuleManagerCommand::StartModule { tx_reply } => {
// Check if the module is already running
if let ModuleStatus::Running = self.status {
if let ModuleStatus::Running(_) = self.status {
let _ = tx_reply.send(());
return;
}
Expand All @@ -126,7 +126,7 @@ impl ModuleManager {
self.config.clone(),
self.bus.clone(),
self.task_launcher.name().clone(),
self.tx_err.clone(),
self.tx_sig.clone(),
self.daemon_handle.clone(),
self.process_tracker.clone(),
self.bpf_context.clone(),
Expand All @@ -135,17 +135,17 @@ impl ModuleManager {

let module: Pin<Box<PulsarModuleTask>> =
self.task_launcher.run(ctx, rx_shutdown).into();
let tx_err = self.tx_err.clone();
let tx_sig = self.tx_sig.clone();

// Check error and forward to this module manager actor
let join_handle = tokio::spawn(async move {
if let Err(err) = module.await {
let _ = tx_err.send(err).await;
let _ = tx_sig.send(ModuleSignal::Error(err)).await;
}
});

self.running_task = Some((tx_shutdown, join_handle));
self.status = ModuleStatus::Running;
self.status = ModuleStatus::Running(Vec::new());

// The `let _ =` ignores any errors when sending.
//
Expand All @@ -156,7 +156,7 @@ impl ModuleManager {
ModuleManagerCommand::StopModule { tx_reply } => {
let result = match self.status {
ModuleStatus::Created | ModuleStatus::Stopped => Ok(()),
ModuleStatus::Running => {
ModuleStatus::Running(_) => {
let (tx_shutdown, task) = self.running_task.take().unwrap();
tx_shutdown.send_signal();
let result = task.await;
Expand Down Expand Up @@ -207,12 +207,18 @@ impl ModuleManager {
}
}
}

pub fn add_warning(&mut self, warning: String) {
if let ModuleStatus::Running(ref mut warnings) = &mut self.status {
warnings.push(warning);
}
}
}

impl Drop for ModuleManager {
/// Stop the task when dropped
fn drop(&mut self) {
if let ModuleStatus::Running = self.status {
if let ModuleStatus::Running(_) = self.status {
self.running_task.take().unwrap().0.send_signal();
}
}
Expand Down Expand Up @@ -299,7 +305,10 @@ pub fn create_module_manager(
async fn run_module_manager_actor(mut actor: ModuleManager) {
loop {
tokio::select!(
Some(err) = actor.rx_err.recv() => actor.handle_module_error(err).await,
Some(sig) = actor.rx_sig.recv() => match sig {
ModuleSignal::Error(err) => actor.handle_module_error(err).await,
ModuleSignal::Warning(warn) => actor.add_warning(warn),
},
cmd = actor.rx_cmd.recv() => match cmd {
Some(cmd) => actor.handle_cmd(cmd).await,
None => return
Expand Down