Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
A simple Rust Application to wrap custom operations on top of notify crate.
Feel Free to explore and make sure to follow or contribute and improve this code.

Notify Crate : https://crates.io/crates/notify
Notify Crate : <https://crates.io/crates/notify>

All process logs are saved in `process.log`

Cheers!!
108 changes: 99 additions & 9 deletions src/core/process_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,79 @@
use core::f64;
use std::collections::{HashMap, HashSet, VecDeque};
use std::error::Error;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use std::thread::sleep;
use std::sync::mpsc;
use std::thread::{sleep, JoinHandle};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use chrono::{DateTime, Utc};
use regex::Regex;
use sysinfo::{Pid, Process, System};

const BUFFER_SIZE: usize = 30;

fn start_buffered_logger(
rx: mpsc::Receiver<String>,
filename: String,
) -> std::thread::JoinHandle<()> {
std::thread::spawn(move || {
let mut log_file = match File::options().append(true).create(true).open(&filename) {
Ok(file) => file,
Err(e) => {
eprintln!("Failed to open log file {}: {:?}", filename, e);
return;
}
};

let mut log_buffer = Vec::new();
let buffer_size = BUFFER_SIZE as usize;

// Continuously receive logs and write them to file
for log in rx {
log_buffer.push(log);

// Flush when buffer is full (15 logs)
if log_buffer.len() >= buffer_size {
for buffered_log in &log_buffer {
if let Err(e) = writeln!(&mut log_file, "{}", buffered_log) {
eprintln!("Failed to write to log file {}: {:?}", filename, e);
}
}
// Flush after writing all buffered logs
if let Err(e) = log_file.flush() {
eprintln!("Failed to flush log file {}: {:?}", filename, e);
}
log_buffer.clear();
}
}

// Flush any remaining logs in buffer when channel closes
if !log_buffer.is_empty() {
for buffered_log in &log_buffer {
if let Err(e) = writeln!(&mut log_file, "{}", buffered_log) {
eprintln!("Failed to write to log file {}: {:?}", filename, e);
}
}
if let Err(e) = log_file.flush() {
eprintln!("Failed to flush log file {}: {:?}", filename, e);
}
}
})
}

impl Drop for ProcessMonitor {
fn drop(&mut self) {
// The log_tx will be dropped automatically, closing the channel
// and signaling the logging thread to stop

// Take ownership of the thread handle and join
if let Some(handle) = self.log_thread.take() {
let _ = handle.join();
}
}
}
#[derive(Debug, Clone)]
pub struct ProcessInfo {
process_name: String,
Expand Down Expand Up @@ -59,7 +124,7 @@ pub struct Alert {
}

pub trait AlertHandler {
fn handle_alert(&self, alert: &Alert) -> Result<(), Box<dyn Error>>;
fn handle_alert(&self, alert: &Alert, tx: mpsc::Sender<String>) -> Result<(), Box<dyn Error>>;
fn get_readable_time(&self, timestamp: f64) -> DateTime<Utc>;
}

Expand All @@ -74,15 +139,25 @@ impl AlertHandler for ConsoleAlertsHandler {
DateTime::from_timestamp(seconds, nanoseconds).unwrap()
}

fn handle_alert(&self, alert: &Alert) -> Result<(), Box<dyn Error>> {
println!(
fn handle_alert(&self, alert: &Alert, tx: mpsc::Sender<String>) -> Result<(), Box<dyn Error>> {
let time = self.get_readable_time(alert.timestamp);
let log = format!(
"==========\nTimeStamp :: {:?}\n Severity :: {:?}\n Process Name :: {:?} \ndetails :: {:?}\n More :: {:?}\n=============\n",
self.get_readable_time(alert.timestamp),
time,
alert.severity,
alert.process_name,
alert.detail,
alert.alert_type
);

// Console output (non-blocking)
println!("{log}");

// Send log to channel for parallel file writing (non-blocking)
if let Err(e) = tx.send(log.clone()) {
eprintln!("Failed to send log to file writer: {:?}", e);
}

Ok(())
}
}
Expand Down Expand Up @@ -216,17 +291,27 @@ pub struct ProcessMonitor {
alert_handlers: Vec<Box<dyn AlertHandler>>,
system: System,
previous_processes: HashMap<u32, ProcessInfo>,
log_tx: mpsc::Sender<String>,
// Wrapping a thread handle in Option so that it can be closed while implementing the Drop
// trait and `.join()` takes the ownership of the thread
log_thread: Option<std::thread::JoinHandle<()>>,
}

impl ProcessMonitor {
pub fn new(interval: Duration, cpu_threshold: f32, mem_threshold: f64, bound: usize) -> Self {
let (log_tx, log_rx) = mpsc::channel::<String>();
let log_thread: Option<JoinHandle<()>> =
Some(start_buffered_logger(log_rx, "process.log".into()));

let mut process_monitor = ProcessMonitor {
scan_interval: interval,
resource_monitor: ResourceMonitor::new(cpu_threshold, mem_threshold, bound),
threat_detector: ThreatDetector::new(),
alert_handlers: Vec::new(),
system: System::new_all(),
previous_processes: HashMap::new(),
log_tx,
log_thread,
};

process_monitor.add_alert_handlers(Box::new(ConsoleAlertsHandler));
Expand All @@ -236,21 +321,26 @@ impl ProcessMonitor {
}

pub fn send_alert(&self, alert: &Alert) {
// Send alert to all handlers (non-blocking)
for handler in &self.alert_handlers {
if let Err(e) = handler.handle_alert(alert) {
eprintln!("Error occured while handling alerts :: {:?}", e);
if let Err(e) = handler.handle_alert(alert, self.log_tx.clone()) {
eprintln!("Error occurred while handling alerts :: {:?}", e);
let alert = Alert {
severity: AlertSeverity::High,
alert_type: AlertType::SystemAlert {
message: format!("Error occured !!"),
message: format!("Error occurred !!"),
},
detail: format!("Error occured while handling alerts :: {:?}", e),
detail: format!("Error occurred while handling alerts :: {:?}", e),
timestamp: self.get_timestamp(),
process_name: String::from("NA"),
};

self.send_alert(&alert);
}
}

// Note: The file_writer_thread will continue running and processing logs
// even after this function returns. It will be cleaned up when the main program exits.
}

pub fn add_alert_handlers(&mut self, alert_handler: Box<dyn AlertHandler>) {
Expand Down