Skip to content

Commit

Permalink
feat: adds non-block watchers for long running tasks (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
cristianoliveira committed Jun 9, 2023
1 parent a6c710a commit ffdc34a
Show file tree
Hide file tree
Showing 13 changed files with 431 additions and 7 deletions.
22 changes: 22 additions & 0 deletions examples/long-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
- name: run long task 2
run: "bash examples/longtask.sh long 2"
change: ["src/**", "tests/integration/workdir/**"]

- name: run long task 3
run: "bash examples/longtask.sh short 3"
change: ["src/**", "tests/integration/workdir/**"]

- name: run long task 4
run: "bash examples/longtask.sh short 4"
change: ["src/**", "tests/integration/workdir/**"]
run_on_init: true

- name: run long task 5
run: "bash examples/longtask.sh short 5"
change: ["src/**", "tests/integration/workdir/**"]
run_on_init: true

- name: run long task 6
run: ["bash examples/longtask.sh list 3", "bash examples/longtask.sh list 4"]
change: ["src/**", "tests/integration/workdir/**"]
run_on_init: true
15 changes: 15 additions & 0 deletions examples/longtask.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env bash

# Simulate a long task and count time
count=0
echo "Started task $1 $2"
while true; do
echo "Long task running... $count"
count=$((count+1))
sleep 3

if [ $count -eq "$2" ]; then
echo "Task $1 finished"
break
fi
done
2 changes: 2 additions & 0 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
pub mod init;
pub mod run;
pub mod watch;
pub mod watch_non_block;

pub use self::run::RunCommand;
pub use cli::init::InitCommand;
pub use cli::watch::WatchCommand;
pub use cli::watch_non_block::WatchNonBlockCommand;

/// # Command interface
///
Expand Down
5 changes: 4 additions & 1 deletion src/cli/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ impl WatchCommand {
stdout::verbose(&format!("struct command: {:?}", command));
};
stdout::info(&format!("----- command: {} -------", command));
cmd::execute(String::from(command))?
if let Err(err) = cmd::execute(String::from(command)) {
stdout::error(&format!("failed to run command: {:?}", err));
return Err(err);
}
}
Ok(())
}
Expand Down
100 changes: 100 additions & 0 deletions src/cli/watch_non_block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
extern crate notify;

use std::sync::mpsc::channel;
use std::sync::mpsc::TryRecvError;

use self::notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher};
use std::thread;
use std::time::Duration;

use cli::Command;
use stdout;
use watches::Watches;
use workers;

/// # `WatchNonBlockCommand`
///
/// Starts watcher to listen the change events configured
/// in watch.yaml, upon change it cancell all previous tasks, including the running
/// task and initiate a new set of tasks.
///
pub struct WatchNonBlockCommand {
watches: Watches,
verbose: bool,
}

impl WatchNonBlockCommand {
pub fn new(watches: Watches, verbose: bool) -> Self {
if verbose {
stdout::verbose(&format!("watches {:?}", watches));
}

WatchNonBlockCommand { watches, verbose }
}
}

impl Command for WatchNonBlockCommand {
fn execute(&self) -> Result<(), String> {
if self.verbose {
stdout::verbose(&format!("Verbose mode enabled."));
};

let (tx, rx) = channel();
let mut watcher: RecommendedWatcher =
Watcher::new(tx, Duration::from_secs(2)).expect("Unable to create watcher");

if let Err(err) = watcher.watch(".", RecursiveMode::Recursive) {
return Err(format!("Unable to watch current directory {:?}", err));
}

let worker = workers::Worker::new(self.verbose);

if let Some(rules) = self.watches.run_on_init() {
stdout::info(&format!("Running on init commands."));
if let Err(err) = worker.schedule(rules.clone()) {
stdout::error(&format!("failed to initiate next run: {:?}", err));
}
}

stdout::info(&format!("Watching..."));
loop {
match rx.try_recv() {
Ok(event) => {
if let DebouncedEvent::Create(path) = event {
let path_str = path.into_os_string().into_string().unwrap();

if self.verbose {
stdout::verbose(&format!("Changed file: {}", path_str));
};

if let Some(rules) = self.watches.watch(&*path_str) {
if self.verbose {
stdout::verbose(&format!("Triggered by change in: {}", path_str));
};

if let Err(err) = worker.cancel_running_tasks() {
stdout::error(&format!(
"failed to cancel current running tasks: {:?}",
err
));
}

if let Err(err) = worker.schedule(rules.clone()) {
stdout::error(&format!("failed to initiate next run: {:?}", err));
}
}
}
}

Err(err) => {
if err != TryRecvError::Empty {
stdout::error(&format!("failed to receive event: {:?}", err));
break;
}
}
}
}

Ok(())
}
}
26 changes: 25 additions & 1 deletion src/cmd.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::process::Command;
use std::process::{Child, Command};
use std::sync::mpsc::channel;
use std::sync::mpsc::Sender;
use std::thread::JoinHandle;
use stdout;

pub fn execute(command_line: String) -> Result<(), String> {
let shell = std::env::var("SHELL").unwrap_or(String::from("/bin/sh"));
Expand All @@ -16,6 +20,26 @@ pub fn execute(command_line: String) -> Result<(), String> {
};
}

pub fn spawn_command(command_line: String) -> Result<Child, String> {
let shell = std::env::var("SHELL").unwrap_or(String::from("/bin/sh"));
let mut cmd = Command::new(shell);

match cmd.arg("-c").arg(command_line).spawn() {
Ok(child) => Ok(child),
Err(err) => Err(format!("Error while creating command {}", err)),
}
}

#[test]
fn it_spawn_a_command_returning_a_child_ref() {
let result = match spawn_command(String::from("echo 'foo'")) {
Ok(mut child) => child.wait().expect("fail to wait"),
Err(err) => panic!("{:?}", err),
};

assert_eq!(format!("{}", result), "exit status: 0")
}

#[test]
fn it_executes_a_command() {
let result = match execute(String::from("echo 'foo'")) {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ pub mod cmd;
pub mod rules;
pub mod stdout;
pub mod watches;
pub mod workers;
pub mod yaml;
23 changes: 20 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod cmd;
mod rules;
mod stdout;
mod watches;
mod workers;
mod yaml;

use cli::*;
Expand Down Expand Up @@ -42,6 +43,7 @@ Options:
<command> Run an arbitrary command for current folder.
--config=<cfgfile> Use given config file.
--target=<task> Execute only the given task target.
-n --non-block Execute tasks and cancel them if a new event is received.
-h --help Shows this message.
-v --version Shows version.
-V Use verbose output.
Expand All @@ -63,6 +65,7 @@ pub struct Args {
pub flag_config: String,
pub flag_target: String,

pub flag_n: bool,
pub flag_c: bool,
pub flag_h: bool,
pub flag_v: bool,
Expand Down Expand Up @@ -93,7 +96,11 @@ fn main() {
show("The list of files received is empty");
}
let watches = Watches::new(rules::from_string(content, arg_command));
execute(WatchCommand::new(watches, args.flag_V));
if args.flag_n {
execute(WatchNonBlockCommand::new(watches, args.flag_V));
} else {
execute(WatchCommand::new(watches, args.flag_V));
}
}
Err(err) => error("Error while reading stdin", err),
};
Expand Down Expand Up @@ -125,10 +132,20 @@ fn main() {

show("Finished there is no task to run");
} else {
execute(WatchCommand::new(Watches::new(filtered), args.flag_V));
let watches = Watches::new(filtered);

if args.flag_n {
execute(WatchNonBlockCommand::new(watches, args.flag_V));
} else {
execute(WatchCommand::new(watches, args.flag_V));
}
}
} else {
execute(WatchCommand::new(Watches::new(rules), args.flag_V));
if args.flag_n {
execute(WatchNonBlockCommand::new(Watches::new(rules), args.flag_V));
} else {
execute(WatchCommand::new(Watches::new(rules), args.flag_V));
}
}
}
Err(err) => error("Error while reading config file", err),
Expand Down
2 changes: 1 addition & 1 deletion src/stdout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub fn info(msg: &str) {
}

pub fn error(msg: &str) {
println!("Funzzy error: {}", msg);
println!("Funzzy ERROR: {}", msg);
}

pub fn verbose(msg: &str) {
Expand Down
Loading

0 comments on commit ffdc34a

Please sign in to comment.