Skip to content

Commit

Permalink
Add syslog capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
azenna authored and banditopazzo committed Nov 4, 2023
1 parent 7c48535 commit 52071af
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/modules/logger/Cargo.toml
Expand Up @@ -11,4 +11,3 @@ bpf-common = { workspace = true }

tokio = { workspace = true, features = ["full"] }
log = { workspace = true }
chrono = { workspace = true, features = ["std"] }
2 changes: 2 additions & 0 deletions crates/modules/logger/README.md
Expand Up @@ -7,13 +7,15 @@ This module will log Pulsar threat events to stdout.
|Config|Type|Description|
|------|----|-----------|
|console|bool|log to stdout|
|syslog|bool|log to syslog|

Default configuration:

```ini
[logger]
enabled=true
console=true
syslog=true
```

You disable this module with:
Expand Down
113 changes: 78 additions & 35 deletions crates/modules/logger/src/lib.rs
Expand Up @@ -2,7 +2,17 @@ use pulsar_core::pdk::{
CleanExit, ConfigError, Event, ModuleConfig, ModuleContext, ModuleError, PulsarModule,
ShutdownSignal, Version,
};
use std::{
env,
fs::File,
io,
os::{
fd::AsFd,
unix::{fs::MetadataExt, net::UnixDatagram},
},
};

const UNIX_SOCK_PATHS: [&str; 3] = ["/dev/log", "/var/run/syslog", "/var/run/log"];
const MODULE_NAME: &str = "logger";

pub fn module() -> PulsarModule {
Expand All @@ -20,17 +30,36 @@ async fn logger_task(
) -> Result<CleanExit, ModuleError> {
let mut receiver = ctx.get_receiver();
let mut rx_config = ctx.get_config();
let mut logger = Logger::from_config(rx_config.read()?);
let sender = ctx.get_sender();

let mut logger = match Logger::from_config(rx_config.read()?) {
Ok(logr) => logr,
Err(logr) => {
sender
.raise_warning("Failed to connect to syslog".into())
.await;
logr
}
};

loop {
tokio::select! {
r = shutdown.recv() => return r,
_ = rx_config.changed() => {
logger = Logger::from_config(rx_config.read()?);
logger = match Logger::from_config(rx_config.read()?) {
Ok(logr) => logr,
Err(logr) => {
sender.raise_warning("Failed to connect to syslog".into()).await;
logr
}
}
}
msg = receiver.recv() => {
let msg = msg?;
logger.process(&msg)
if let Err(e) = logger.process(&msg) {
sender.raise_warning(format!("Writing to syslog failed: {e}")).await;
logger = Logger { syslog: None, ..logger };
}
},
}
}
Expand All @@ -40,7 +69,7 @@ async fn logger_task(
struct Config {
console: bool,
// file: bool, //TODO:
// syslog: bool, //TODO:
syslog: bool,
}

impl TryFrom<&ModuleConfig> for Config {
Expand All @@ -50,51 +79,65 @@ impl TryFrom<&ModuleConfig> for Config {
Ok(Self {
console: config.with_default("console", true)?,
// file: config.required("file")?,
// syslog: config.required("syslog")?,
syslog: config.with_default("syslog", true)?,
})
}
}

#[derive(Debug)]
struct Logger {
console: bool,
syslog: Option<UnixDatagram>,
}

impl Logger {
fn from_config(rx_config: Config) -> Self {
let Config { console } = rx_config;
Self { console }
}
fn from_config(rx_config: Config) -> Result<Self, Self> {
let Config { console, syslog } = rx_config;

let connected_to_journal = io::stderr()
.as_fd()
.try_clone_to_owned()
.and_then(|fd| File::from(fd).metadata())
.map(|meta| format!("{}:{}", meta.dev(), meta.ino()))
.ok()
.and_then(|stderr| {
env::var_os("JOURNAL_STREAM").map(|s| s.to_string_lossy() == stderr.as_str())
})
.unwrap_or(false);

let opt_sock = (syslog && !connected_to_journal)
.then(|| {
let sock = UnixDatagram::unbound().ok()?;
UNIX_SOCK_PATHS
.iter()
.find_map(|path| sock.connect(path).ok())
.map(|_| sock)
})
.flatten();

fn process(&self, event: &Event) {
if event.header().threat.is_some() && self.console {
terminal::print_event(event);
if syslog && opt_sock.is_none() {
Err(Self {
console,
syslog: opt_sock,
})
} else {
Ok(Self {
console,
syslog: opt_sock,
})
}
}
}

pub mod terminal {
use chrono::{DateTime, Utc};
use pulsar_core::{event::Threat, pdk::Event};

pub fn print_event(event: &Event) {
let header = event.header();
let time = DateTime::<Utc>::from(header.timestamp).format("%Y-%m-%dT%TZ");
let image = &header.image;
let pid = &header.pid;
let payload = event.payload();
fn process(&mut self, event: &Event) -> io::Result<()> {
if event.header().threat.is_some() {
if self.console {
println!("{:#}", event);
}

if let Some(Threat {
source,
description,
extra: _,
}) = &event.header().threat
{
println!(
"[{time} \x1b[1;30;43mTHREAT\x1b[0m {image} ({pid})] [{source} - {description}] {payload}"
)
} else {
let source = &header.source;
println!("[{time} \x1b[1;30;46mEVENT\x1b[0m {image} ({pid})] [{source}] {payload}")
if let Some(ref mut syslog) = &mut self.syslog {
syslog.send(format!("{}", event).as_bytes())?;
}
}
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/pulsar-core/Cargo.toml
Expand Up @@ -8,7 +8,6 @@ repository.workspace = true
[dependencies]
bpf-common = { path = "../bpf-common" }
validatron = { path = "../validatron" }

serde = { workspace = true, features = ["derive"] }
toml_edit = { workspace = true, features = ["easy"] }
tokio = { workspace = true, features = ["full"] }
Expand All @@ -18,3 +17,4 @@ log = { workspace = true }
thiserror = { workspace = true }
nix = { workspace = true }
strum = { workspace = true, features = ["derive"] }
chrono = { workspace = true, features = ["std"] }
37 changes: 37 additions & 0 deletions crates/pulsar-core/src/event.rs
Expand Up @@ -4,6 +4,7 @@ use std::{
time::SystemTime,
};

use chrono::{DateTime, Utc};
use serde::{de::DeserializeOwned, ser, Deserialize, Serialize};
use strum::{EnumDiscriminants, EnumString};
use validatron::{Operator, Validatron, ValidatronError};
Expand All @@ -29,6 +30,42 @@ impl Event {
}
}

impl fmt::Display for Event {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let header = self.header();
let time = DateTime::<Utc>::from(header.timestamp).format("%Y-%m-%dT%TZ");
let image = &header.image;
let pid = &header.pid;
let payload = self.payload();

if let Some(Threat {
source,
description,
extra: _,
}) = &self.header().threat
{
if f.alternate() {
writeln!(f, "[{time} \x1b[1;30;43mTHREAT\x1b[0m {image} ({pid})] [{source} - {description}] {payload}")
} else {
writeln!(
f,
"[{time} THREAT {image} ({pid})] [{source} - {description}] {payload}"
)
}
} else {
let source = &header.source;
if f.alternate() {
writeln!(
f,
"[{time} \x1b[1;30;46mEVENT\x1b[0m {image} ({pid})] [{source}] {payload}"
)
} else {
writeln!(f, "[{time} EVENT {image} ({pid})] [{source}] {payload}")
}
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Validatron)]
pub struct Header {
pub image: String,
Expand Down
2 changes: 1 addition & 1 deletion src/pulsar/mod.rs
Expand Up @@ -63,7 +63,7 @@ pub async fn pulsar_cli_run(options: &PulsarCliOpts) -> Result<()> {
match ws_read {
Ok(event) => {
if *all || event.header().threat.is_some() {
logger::terminal::print_event(&event)
println!("{:#}", event);
}
}
Err(e) => return Err(e).context("error reading from websocket"),
Expand Down

0 comments on commit 52071af

Please sign in to comment.