Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(unstable): deno run --watch #7382

Merged
merged 14 commits into from
Sep 11, 2020
89 changes: 89 additions & 0 deletions cli/file_watcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use crate::colors;
use deno_core::ErrBox;
use futures::stream::StreamExt;
use futures::Future;
use notify::event::Event as NotifyEvent;
use notify::event::EventKind;
use notify::Error as NotifyError;
use notify::RecommendedWatcher;
use notify::RecursiveMode;
use notify::Watcher;
use std::path::PathBuf;
use std::pin::Pin;
use tokio::select;
use tokio::sync::mpsc;

// TODO(bartlomieju): rename
type WatchFuture =
Pin<Box<dyn Future<Output = std::result::Result<(), deno_core::ErrBox>>>>;

async fn error_handler(watch_future: WatchFuture) {
let result = watch_future.await;
if let Err(err) = result {
let msg = format!("{}: {}", colors::red_bold("error"), err.to_string(),);
eprintln!("{}", msg);
}
}

pub async fn watch_func<F>(
watch_paths: &[PathBuf],
closure: F,
) -> Result<(), ErrBox>
where
F: Fn() -> WatchFuture,
{
loop {
let func = error_handler(closure());
let mut is_file_changed = false;
select! {
_ = file_watcher(watch_paths) => {
is_file_changed = true;
info!(
"{} File change detected! Restarting!",
colors::intense_blue("Watcher")
);
},
_ = func => { },
};
if !is_file_changed {
info!(
"{} Process terminated! Restarting on file change...",
colors::intense_blue("Watcher")
);
file_watcher(watch_paths).await?;
info!(
"{} File change detected! Restarting!",
colors::intense_blue("Watcher")
);
}
}
}

pub async fn file_watcher(paths: &[PathBuf]) -> Result<(), deno_core::ErrBox> {
let (sender, mut receiver) = mpsc::channel::<Result<NotifyEvent, ErrBox>>(16);
let sender = std::sync::Mutex::new(sender);

let mut watcher: RecommendedWatcher =
Watcher::new_immediate(move |res: Result<NotifyEvent, NotifyError>| {
let res2 = res.map_err(ErrBox::from);
let mut sender = sender.lock().unwrap();
// Ignore result, if send failed it means that watcher was already closed,
// but not all messages have been flushed.
let _ = sender.try_send(res2);
})?;

for path in paths {
watcher.watch(path, RecursiveMode::NonRecursive)?;
}

while let Some(result) = receiver.next().await {
let event = result?;
match event.kind {
EventKind::Create(_) => break,
EventKind::Modify(_) => break,
EventKind::Remove(_) => break,
_ => continue,
}
}
Ok(())
}
29 changes: 29 additions & 0 deletions cli/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub struct Flags {
pub unstable: bool,
pub v8_flags: Option<Vec<String>>,
pub version: bool,
pub watch: bool,
pub write_allowlist: Vec<PathBuf>,
}

Expand Down Expand Up @@ -562,6 +563,7 @@ fn run_parse(flags: &mut Flags, matches: &clap::ArgMatches) {
flags.argv.push(v);
}

flags.watch = matches.is_present("watch");
flags.subcommand = DenoSubcommand::Run { script };
}

Expand Down Expand Up @@ -1157,6 +1159,7 @@ fn run_test_args<'a, 'b>(app: App<'a, 'b>) -> App<'a, 'b> {

fn run_subcommand<'a, 'b>() -> App<'a, 'b> {
run_test_args(SubCommand::with_name("run"))
.arg(watch_arg())
.setting(AppSettings::TrailingVarArg)
.arg(script_arg())
.about("Run a program given a filename or url to the module. Use '-' as a filename to read from stdin.")
Expand Down Expand Up @@ -1409,6 +1412,16 @@ fn v8_flags_arg_parse(flags: &mut Flags, matches: &ArgMatches) {
}
}

fn watch_arg<'a, 'b>() -> Arg<'a, 'b> {
Arg::with_name("watch")
.long("watch")
.help("Watch for file changes and restart process automatically")
lucacasonato marked this conversation as resolved.
Show resolved Hide resolved
.long_help(
"Watch for file changes and restart process automatically.
Only local files from entry point module graph are watched.",
)
}

fn no_check_arg<'a, 'b>() -> Arg<'a, 'b> {
Arg::with_name("no-check")
.long("no-check")
Expand Down Expand Up @@ -1560,6 +1573,22 @@ mod tests {
);
}

#[test]
fn run_watch() {
let r = flags_from_vec_safe(svec!["deno", "run", "--watch", "script.ts"]);
let flags = r.unwrap();
assert_eq!(
flags,
Flags {
subcommand: DenoSubcommand::Run {
script: "script.ts".to_string(),
},
watch: true,
..Flags::default()
}
);
}

#[test]
fn run_reload_allow_write() {
let r = flags_from_vec_safe(svec![
Expand Down
108 changes: 85 additions & 23 deletions cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod diff;
mod disk_cache;
pub mod errors;
mod file_fetcher;
mod file_watcher;
pub mod flags;
mod flags_allow_net;
mod fmt;
Expand Down Expand Up @@ -229,6 +230,8 @@ async fn cache_command(flags: Flags, files: Vec<String>) -> Result<(), ErrBox> {

for file in files {
let specifier = ModuleSpecifier::resolve_url_or_path(&file)?;
// TODO(bartlomieju): don't use `preload_module` in favor of calling "GlobalState::prepare_module_load()"
// explicitly? Seems wasteful to create multiple worker just to run TS compiler
worker.preload_module(&specifier).await.map(|_| ())?;
}

Expand Down Expand Up @@ -436,33 +439,29 @@ async fn run_repl(flags: Flags) -> Result<(), ErrBox> {
}
}

async fn run_command(flags: Flags, script: String) -> Result<(), ErrBox> {
async fn run_from_stdin(flags: Flags) -> Result<(), ErrBox> {
let global_state = GlobalState::new(flags.clone())?;
let main_module = if script != "-" {
ModuleSpecifier::resolve_url_or_path(&script).unwrap()
} else {
ModuleSpecifier::resolve_url_or_path("./__$deno$stdin.ts").unwrap()
};
let main_module =
ModuleSpecifier::resolve_url_or_path("./__$deno$stdin.ts").unwrap();
let mut worker =
MainWorker::create(&global_state.clone(), main_module.clone())?;
if script == "-" {
let mut source = Vec::new();
std::io::stdin().read_to_end(&mut source)?;
let main_module_url = main_module.as_url().to_owned();
// Create a dummy source file.
let source_file = SourceFile {
filename: main_module_url.to_file_path().unwrap(),
url: main_module_url,
types_header: None,
media_type: MediaType::TypeScript,
source_code: source.into(),
};
// Save our fake file into file fetcher cache
// to allow module access by TS compiler
global_state
.file_fetcher
.save_source_file_in_cache(&main_module, source_file);

let mut source = Vec::new();
std::io::stdin().read_to_end(&mut source)?;
let main_module_url = main_module.as_url().to_owned();
// Create a dummy source file.
let source_file = SourceFile {
filename: main_module_url.to_file_path().unwrap(),
url: main_module_url,
types_header: None,
media_type: MediaType::TypeScript,
source_code: source.into(),
};
// Save our fake file into file fetcher cache
// to allow module access by TS compiler
global_state
.file_fetcher
.save_source_file_in_cache(&main_module, source_file);

debug!("main_module {}", main_module);
worker.execute_module(&main_module).await?;
Expand All @@ -472,6 +471,69 @@ async fn run_command(flags: Flags, script: String) -> Result<(), ErrBox> {
Ok(())
}

async fn run_with_watch(flags: Flags, script: String) -> Result<(), ErrBox> {
let main_module = ModuleSpecifier::resolve_url_or_path(&script)?;
let global_state = GlobalState::new(flags.clone())?;

let mut module_graph_loader = module_graph::ModuleGraphLoader::new(
global_state.file_fetcher.clone(),
global_state.maybe_import_map.clone(),
Permissions::allow_all(),
false,
false,
);
module_graph_loader.add_to_graph(&main_module, None).await?;
let module_graph = module_graph_loader.get_graph();

// Find all local files in graph
let paths_to_watch: Vec<PathBuf> = module_graph
.values()
.map(|f| Url::parse(&f.url).unwrap())
.filter(|url| url.scheme() == "file")
.map(|url| url.to_file_path().unwrap())
.collect();

// FIXME(bartlomieju): new file watcher is created on after each restart
file_watcher::watch_func(&paths_to_watch, move || {
// FIXME(bartlomieju): GlobalState must be created on each restart - otherwise file fetcher
// will use cached source files
let gs = GlobalState::new(flags.clone()).unwrap();
let main_module = main_module.clone();
async move {
let mut worker = MainWorker::create(&gs, main_module.clone())?;
debug!("main_module {}", main_module);
worker.execute_module(&main_module).await?;
worker.execute("window.dispatchEvent(new Event('load'))")?;
(&mut *worker).await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?;
Ok(())
}
.boxed_local()
})
.await
}

async fn run_command(flags: Flags, script: String) -> Result<(), ErrBox> {
// Read script content from stdin
if script == "-" {
return run_from_stdin(flags).await;
}

if flags.watch {
return run_with_watch(flags, script).await;
}

let main_module = ModuleSpecifier::resolve_url_or_path(&script)?;
let global_state = GlobalState::new(flags.clone())?;
let mut worker = MainWorker::create(&global_state, main_module.clone())?;
debug!("main_module {}", main_module);
worker.execute_module(&main_module).await?;
worker.execute("window.dispatchEvent(new Event('load'))")?;
(&mut *worker).await?;
worker.execute("window.dispatchEvent(new Event('unload'))")?;
Ok(())
}

async fn test_command(
flags: Flags,
include: Option<Vec<String>>,
Expand Down
35 changes: 35 additions & 0 deletions cli/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,41 @@ fn info_with_compiled_source() {
assert_eq!(output.stderr, b"");
}

#[test]
fn run_watch() {
let t = TempDir::new().expect("tempdir fail");
let file_to_watch = t.path().join("file_to_watch.js");
std::fs::write(&file_to_watch, "console.log('Hello world');")
.expect("error writing file");

let mut child = util::deno_cmd()
.current_dir(util::root_path())
.arg("run")
.arg("--watch")
.arg(&file_to_watch)
.stdout(std::process::Stdio::piped())
.spawn()
.expect("failed to spawn script");

let stdout = child.stdout.as_mut().unwrap();
let mut stdout_lines =
std::io::BufReader::new(stdout).lines().map(|r| r.unwrap());

assert!(stdout_lines.next().unwrap().contains("Hello world"));
assert!(stdout_lines.next().unwrap().contains("Process terminated"));

// Change content of the file
std::fs::write(&file_to_watch, "console.log('Hello world2');")
.expect("error writing file");

assert!(stdout_lines.next().unwrap().contains("Restarting"));
assert!(stdout_lines.next().unwrap().contains("Hello world2"));
assert!(stdout_lines.next().unwrap().contains("Process terminated"));

child.kill().unwrap();
drop(t);
}

#[test]
fn repl_test_console_log() {
let (out, err) = util::run_and_collect_output(
Expand Down