Skip to content
This repository has been archived by the owner on Sep 14, 2019. It is now read-only.

Commit

Permalink
Merge PR awslabs#24 from upstream (#4) (#10)
Browse files Browse the repository at this point in the history
Merge PR awslabs#24 from upstream (#4)
  • Loading branch information
crisidev committed Jul 19, 2019
1 parent f877853 commit e1e6dd4
Show file tree
Hide file tree
Showing 8 changed files with 372 additions and 1 deletion.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ tls = ["openssl"]
gelf = ["serde", "serde_json"]
ltsv = []
syslog = []
file = []
file = ["notify", "glob"]

[build-dependencies.capnpc]
version = "*"
Expand All @@ -27,8 +27,10 @@ chrono = "0.4"
clap = "2"
coio = { git = "https://github.com/zonyitoo/coio-rs", optional = true }
flate2 = "1.0"
glob = { version = "0.3", optional = true }
kafka = { version = "0.7", features = ["snappy", "gzip", "security"], optional = true }
log = "0.4"
notify = { version = "4.0", optional = true }
openssl = { version = "~0.9", optional = true }
rand = "0.5"
redis = { version = "0.10", optional = true }
Expand Down
4 changes: 4 additions & 0 deletions flowgger.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
### Standard input
# type = "stdin"

### File input
# type = "file"
# src = "/var/lib/docker/containers/*/*.log"

### Syslog over UDP
# type = "udp"
# listen = "0.0.0.0:514"
Expand Down
182 changes: 182 additions & 0 deletions src/flowgger/input/file/discovery.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
use std::path::{Path, PathBuf};
use std::sync::mpsc::{channel, Receiver, SyncSender};
use std::thread;
use std::time::Duration;

use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher};

use glob::{glob, Pattern};

use crate::flowgger::decoder::Decoder;
use crate::flowgger::encoder::Encoder;
use crate::flowgger::input::file::worker::FileWorker;

pub struct FileDiscovery {
watcher: RecommendedWatcher,
event_rx: Receiver<DebouncedEvent>,
path_match: Pattern,
log_tx: SyncSender<Vec<u8>>,
decoder: Box<dyn Decoder + Send>,
encoder: Box<dyn Encoder + Send>,
}

impl FileDiscovery {
pub fn new(
path_match: &str,
log_tx: SyncSender<Vec<u8>>,
decoder: Box<dyn Decoder + Send>,
encoder: Box<dyn Encoder + Send>,
) -> FileDiscovery {
let (tx, rx) = channel();
let watcher =
Watcher::new(tx, Duration::from_secs(1)).expect("Cannot initialize fs watcher");

FileDiscovery {
watcher,
event_rx: rx,
path_match: Pattern::new(path_match).expect("Wrong input.src"),
log_tx,
decoder,
encoder,
}
}

pub fn run(&mut self) {
let path = self.path_match.clone();
self.add_initial_watches(PathBuf::from(path.as_str()));
self.start_initial_workers();

loop {
match self.event_rx.recv() {
Ok(event) => match event {
DebouncedEvent::Create(event_path) => {
if event_path.metadata().unwrap().is_dir() {
if should_be_watched(&self.path_match, &event_path) {
self.add_directory_watch(&event_path)
}
} else if self.path_match.matches_path(&event_path) {
self.start_worker(&event_path, false);
}
}
DebouncedEvent::NoticeWrite(event_path) => {
if self.path_match.matches_path(&event_path) {
self.start_worker(&event_path, false);
}
}
_ => println!("Unknown DebouncedEvent {:?}", event),
},
Err(e) => println!("Error receiving event: {}", e),
}
}
}

fn add_initial_watches(&mut self, path_match: PathBuf) {
for entry in glob(path_match.to_str().unwrap()).expect("Failed to read glob pattern") {
match entry {
Ok(path) => {
if path.is_dir() {
self.add_directory_watch(&path)
}
}
Err(e) => panic!("Failed to read glob entry: {}", e),
}
}
if let Some(parent) = path_match.clone().parent() {
self.add_initial_watches(PathBuf::from(parent))
}
}

fn start_initial_workers(&self) {
for entry in glob(self.path_match.as_str()).expect("Failed to read glob pattern") {
match entry {
Ok(path) => self.start_worker(&path, true),
Err(e) => panic!("Failed to read glob entry: {}", e),
};
}
}

fn add_directory_watch(&mut self, path: &Path) {
self.watcher
.watch(path, RecursiveMode::NonRecursive)
.unwrap();
}

fn start_worker(&self, path: &Path, from_tail: bool) {
let p = path.to_owned().clone();
let t = self.log_tx.clone();
let d: Box<dyn Decoder + Send> = self.decoder.clone_boxed();
let e: Box<dyn Encoder + Send> = self.encoder.clone_boxed();
thread::spawn(move || {
let mut worker = FileWorker::new(&p, t, d, e);
worker.run(from_tail);
});
}
}

fn should_be_watched(match_path: &Pattern, path: &Path) -> bool {
if match_path.matches_path(path) {
true
} else {
match PathBuf::from(match_path.as_str()).parent() {
Some(parent) => {
should_be_watched(&Pattern::new(parent.to_str().unwrap()).unwrap(), path)
}
None => false,
}
}
}

#[test]
fn test_should_be_watched() {
struct TestData {
match_path: Pattern,
path: PathBuf,
result: bool,
}
let tt = vec![
TestData {
match_path: Pattern::new("/tmp/1.txt").unwrap(),
path: PathBuf::from("/tmp/1.txt"),
result: true,
},
TestData {
match_path: Pattern::new("/tmp/1.txt").unwrap(),
path: PathBuf::from("/tmp/2.txt"),
result: false,
},
TestData {
match_path: Pattern::new("/tmp/*.txt").unwrap(),
path: PathBuf::from("/tmp/1.txt"),
result: true,
},
TestData {
match_path: Pattern::new("/tmp/*.txt").unwrap(),
path: PathBuf::from("/tmp/2.txt"),
result: true,
},
TestData {
match_path: Pattern::new("/tmp/1.txt").unwrap(),
path: PathBuf::from("/tmp"),
result: true,
},
TestData {
match_path: Pattern::new("/tmp/1.txt").unwrap(),
path: PathBuf::from("/tmp/logs"),
result: false,
},
TestData {
match_path: Pattern::new("/tmp/*/1.txt").unwrap(),
path: PathBuf::from("/tmp/logs/1.txt"),
result: true,
},
TestData {
match_path: Pattern::new("/tmp/*/1.txt").unwrap(),
path: PathBuf::from("/tmp/logs/1/1.txt"),
result: true,
},
];

for data in tt {
assert_eq!(data.result, should_be_watched(&data.match_path, &data.path));
}
}
42 changes: 42 additions & 0 deletions src/flowgger/input/file/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
mod discovery;
mod worker;
use self::discovery::FileDiscovery;

use std::sync::mpsc::SyncSender;

use super::Input;
use crate::flowgger::config::Config;
use crate::flowgger::decoder::Decoder;
use crate::flowgger::encoder::Encoder;

#[derive(Clone)]
pub struct FileConfig {
src: String,
}

pub struct FileInput {
file_config: FileConfig,
}

impl FileInput {
pub fn new(config: &Config) -> FileInput {
let src_path = match config.lookup("input.src") {
None => panic!("Missing file path"),
Some(src) => src.as_str().expect("OK").to_owned(),
};
let file_config = FileConfig { src: src_path };
FileInput { file_config }
}
}

impl Input for FileInput {
fn accept(
&self,
tx: SyncSender<Vec<u8>>,
decoder: Box<dyn Decoder + Send>,
encoder: Box<dyn Encoder + Send>,
) {
let mut discovery = FileDiscovery::new(&self.file_config.src, tx, decoder, encoder);
discovery.run();
}
}
120 changes: 120 additions & 0 deletions src/flowgger/input/file/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use std;
use std::fs::File;
use std::io::prelude::*;
use std::io::stderr;
use std::io::{BufReader, SeekFrom};
use std::path::{Path, PathBuf};
use std::sync::mpsc::{channel, SyncSender};
use std::time::Duration;

use notify::{watcher, RecursiveMode, Watcher};

use crate::flowgger::decoder::Decoder;
use crate::flowgger::encoder::Encoder;

pub struct FileWorker {
path: PathBuf,
tx: SyncSender<Vec<u8>>,
decoder: Box<dyn Decoder + Send>,
encoder: Box<dyn Encoder + Send>,
}

impl FileWorker {
pub fn new(
path: &Path,
tx: SyncSender<Vec<u8>>,
decoder: Box<dyn Decoder + Send>,
encoder: Box<dyn Encoder + Send>,
) -> FileWorker {
FileWorker {
path: PathBuf::from(path),
tx,
decoder,
encoder,
}
}

pub fn run(&mut self, from_tail: bool) {
let (tx, rx) = channel();
let mut watcher = watcher(tx, Duration::from_secs(2)).expect("Cannot create file watcher");
watcher
.watch(&self.path, RecursiveMode::NonRecursive)
.unwrap();

let fr = FollowReader::new(&self.path, from_tail);
let mut reader = BufReader::new(fr);
let mut buffer = Vec::new();

let (decoder, encoder): (Box<dyn Decoder>, Box<dyn Encoder>) =
(self.decoder.clone_boxed(), self.encoder.clone_boxed());
let mut finish = false;
while !finish {
match rx.recv() {
Ok(_) => loop {
let r = reader.read_until(10, &mut buffer);
match r {
Ok(bytes_read) => {
if bytes_read == 0 {
break;
}
}
Err(_) => {
finish = true;
break;
}
}
if buffer[buffer.len() - 1] == 10 {
buffer.pop();
let line = String::from_utf8(buffer.clone()).unwrap();
buffer.truncate(0);
if let Err(e) = handle_record(&line, &self.tx, &decoder, &encoder) {
let _ = writeln!(stderr(), "{}: [{}]", e, line.trim());
}
}
},
Err(_) => {}
}
}
}
}

pub struct FollowReader {
file: File,
path: PathBuf,
}

impl FollowReader {
pub fn new(filename: &Path, from_tail: bool) -> FollowReader {
let mut f = File::open(filename).expect("Failed to open file");
if from_tail {
f.seek(SeekFrom::End(0)).unwrap();
}
FollowReader {
file: f,
path: PathBuf::from(filename),
}
}
}

impl Read for FollowReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.path.exists() {
self.file.sync_data().unwrap();
self.file.read(buf)
} else {
Err(std::io::Error::new(std::io::ErrorKind::Other, ""))
}
}
}

fn handle_record(
line: &str,
tx: &SyncSender<Vec<u8>>,
decoder: &Box<dyn Decoder>,
encoder: &Box<dyn Encoder>,
) -> Result<(), &'static str> {
let decoded = decoder.decode(line)?;
let reencoded = encoder.encode(decoded)?;
tx.send(reencoded).unwrap();
Ok(())
}
4 changes: 4 additions & 0 deletions src/flowgger/input/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "file")]
mod file;
#[cfg(feature = "redis-input")]
mod redis_input;
mod stdin_input;
Expand All @@ -7,6 +9,8 @@ mod tls;
#[cfg(feature = "syslog")]
mod udp_input;

#[cfg(feature = "file")]
pub use self::file::FileInput;
#[cfg(feature = "redis-input")]
pub use self::redis_input::RedisInput;
pub use self::stdin_input::StdinInput;
Expand Down
Loading

0 comments on commit e1e6dd4

Please sign in to comment.