Skip to content

Commit

Permalink
fix(watch): preserve ProcState::file_fetcher between restarts (#15466)
Browse files Browse the repository at this point in the history
This commit changes "ProcState" to store "file_fetcher" field in an "Arc",
allowing it to be preserved between restarts and thus keeping the state
alive between the restarts. File watchers for "deno test" and "deno bench"
now reset "ProcState" between restarts.
  • Loading branch information
nayeemrmn authored and dsherret committed Jan 13, 2023
1 parent a6676a0 commit 69f77d4
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 42 deletions.
4 changes: 1 addition & 3 deletions cli/cache/mod.rs
Expand Up @@ -50,12 +50,10 @@ pub struct FetchCacher {
impl FetchCacher {
pub fn new(
emit_cache: EmitCache,
file_fetcher: FileFetcher,
file_fetcher: Arc<FileFetcher>,
root_permissions: PermissionsContainer,
dynamic_permissions: PermissionsContainer,
) -> Self {
let file_fetcher = Arc::new(file_fetcher);

Self {
emit_cache,
dynamic_permissions,
Expand Down
1 change: 1 addition & 0 deletions cli/cache/node.rs
Expand Up @@ -24,6 +24,7 @@ struct CjsAnalysisData {
pub reexports: Vec<String>,
}

#[derive(Clone)]
pub struct NodeAnalysisCache {
db_file_path: Option<PathBuf>,
inner: Arc<Mutex<Option<Option<NodeAnalysisCacheInner>>>>,
Expand Down
8 changes: 8 additions & 0 deletions cli/cache/parsed_source.rs
Expand Up @@ -65,6 +65,14 @@ impl ParsedSourceCache {
}
}

pub fn reset_for_file_watcher(&self) -> Self {
Self {
db_cache_path: self.db_cache_path.clone(),
cli_version: self.cli_version.clone(),
sources: Default::default(),
}
}

pub fn get_parsed_source_from_module(
&self,
module: &deno_graph::Module,
Expand Down
8 changes: 4 additions & 4 deletions cli/graph_util.rs
Expand Up @@ -72,7 +72,7 @@ pub struct GraphData {

impl GraphData {
/// Store data from `graph` into `self`.
pub fn add_graph(&mut self, graph: &ModuleGraph, reload: bool) {
pub fn add_graph(&mut self, graph: &ModuleGraph) {
for graph_import in &graph.imports {
for dep in graph_import.dependencies.values() {
for resolved in [&dep.maybe_code, &dep.maybe_type] {
Expand All @@ -96,7 +96,7 @@ impl GraphData {
continue;
}

if !reload && self.modules.contains_key(specifier) {
if self.modules.contains_key(specifier) {
continue;
}

Expand Down Expand Up @@ -470,7 +470,7 @@ impl GraphData {
impl From<&ModuleGraph> for GraphData {
fn from(graph: &ModuleGraph) -> Self {
let mut graph_data = GraphData::default();
graph_data.add_graph(graph, false);
graph_data.add_graph(graph);
graph_data
}
}
Expand Down Expand Up @@ -542,7 +542,7 @@ pub async fn create_graph_and_maybe_check(

let check_js = ps.options.check_js();
let mut graph_data = GraphData::default();
graph_data.add_graph(&graph, false);
graph_data.add_graph(&graph);
graph_data
.check(
&graph.roots,
Expand Down
1 change: 0 additions & 1 deletion cli/module_loader.rs
Expand Up @@ -277,7 +277,6 @@ impl ModuleLoader for CliModuleLoader {
lib,
root_permissions,
dynamic_permissions,
false,
)
.await
}
Expand Down
52 changes: 39 additions & 13 deletions cli/proc_state.rs
Expand Up @@ -77,7 +77,7 @@ pub struct ProcState(Arc<Inner>);

pub struct Inner {
pub dir: DenoDir,
pub file_fetcher: FileFetcher,
pub file_fetcher: Arc<FileFetcher>,
pub http_client: HttpClient,
pub options: Arc<CliOptions>,
pub emit_cache: EmitCache,
Expand Down Expand Up @@ -147,6 +147,38 @@ impl ProcState {
Ok(ps)
}

/// Reset all runtime state to its default. This should be used on file
/// watcher restarts.
pub fn reset_for_file_watcher(&mut self) {
self.0 = Arc::new(Inner {
dir: self.dir.clone(),
options: self.options.clone(),
emit_cache: self.emit_cache.clone(),
emit_options_hash: self.emit_options_hash,
emit_options: self.emit_options.clone(),
file_fetcher: self.file_fetcher.clone(),
http_client: self.http_client.clone(),
graph_data: Default::default(),
lockfile: self.lockfile.clone(),
maybe_import_map: self.maybe_import_map.clone(),
maybe_inspector_server: self.maybe_inspector_server.clone(),
root_cert_store: self.root_cert_store.clone(),
blob_store: Default::default(),
broadcast_channel: Default::default(),
shared_array_buffer_store: Default::default(),
compiled_wasm_module_store: Default::default(),
parsed_source_cache: self.parsed_source_cache.reset_for_file_watcher(),
maybe_resolver: self.maybe_resolver.clone(),
maybe_file_watcher_reporter: self.maybe_file_watcher_reporter.clone(),
node_analysis_cache: self.node_analysis_cache.clone(),
npm_cache: self.npm_cache.clone(),
npm_resolver: self.npm_resolver.clone(),
cjs_resolutions: Default::default(),
progress_bar: self.progress_bar.clone(),
node_std_graph_prepared: AtomicBool::new(false),
});
}

async fn build_with_sender(
cli_options: Arc<CliOptions>,
maybe_sender: Option<tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>>,
Expand Down Expand Up @@ -256,7 +288,7 @@ impl ProcState {
.write_hashable(&emit_options)
.finish(),
emit_options,
file_fetcher,
file_fetcher: Arc::new(file_fetcher),
http_client,
graph_data: Default::default(),
lockfile,
Expand Down Expand Up @@ -291,7 +323,6 @@ impl ProcState {
lib: TsTypeLib,
root_permissions: PermissionsContainer,
dynamic_permissions: PermissionsContainer,
reload_on_watch: bool,
) -> Result<(), AnyError> {
log::debug!("Preparing module load.");
let _pb_clear_guard = self.progress_bar.clear_guard();
Expand All @@ -304,7 +335,7 @@ impl ProcState {
.map(|s| (s, ModuleKind::Esm))
.collect::<Vec<_>>();

if !reload_on_watch && !has_root_npm_specifier {
if !has_root_npm_specifier {
let graph_data = self.graph_data.read();
if self.options.type_check_mode() == TypeCheckMode::None
|| graph_data.is_type_checked(&roots, &lib)
Expand Down Expand Up @@ -338,7 +369,6 @@ impl ProcState {
struct ProcStateLoader<'a> {
inner: &'a mut cache::FetchCacher,
graph_data: Arc<RwLock<GraphData>>,
reload: bool,
}
impl Loader for ProcStateLoader<'_> {
fn get_cache_info(
Expand All @@ -355,17 +385,14 @@ impl ProcState {
let graph_data = self.graph_data.read();
let found_specifier = graph_data.follow_redirect(specifier);
match graph_data.get(&found_specifier) {
Some(_) if !self.reload => {
Box::pin(futures::future::ready(Err(anyhow!(""))))
}
Some(_) => Box::pin(futures::future::ready(Err(anyhow!("")))),
_ => self.inner.load(specifier, is_dynamic),
}
}
}
let mut loader = ProcStateLoader {
inner: &mut cache,
graph_data: self.graph_data.clone(),
reload: reload_on_watch,
};

let maybe_file_watcher_reporter: Option<&dyn deno_graph::source::Reporter> =
Expand Down Expand Up @@ -404,7 +431,7 @@ impl ProcState {

let npm_package_reqs = {
let mut graph_data = self.graph_data.write();
graph_data.add_graph(&graph, reload_on_watch);
graph_data.add_graph(&graph);
let check_js = self.options.check_js();
graph_data
.check(
Expand Down Expand Up @@ -492,7 +519,6 @@ impl ProcState {
lib,
PermissionsContainer::allow_all(),
PermissionsContainer::allow_all(),
false,
)
.await
}
Expand All @@ -506,7 +532,7 @@ impl ProcState {
let node_std_graph = self
.create_graph(vec![(node::MODULE_ALL_URL.clone(), ModuleKind::Esm)])
.await?;
self.graph_data.write().add_graph(&node_std_graph, false);
self.graph_data.write().add_graph(&node_std_graph);
self.node_std_graph_prepared.store(true, Ordering::Relaxed);
Ok(())
}
Expand Down Expand Up @@ -747,7 +773,7 @@ pub fn import_map_from_text(
Ok(result.import_map)
}

#[derive(Debug)]
#[derive(Clone, Debug)]
struct FileWatcherReporter {
sender: tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>,
file_paths: Arc<Mutex<Vec<PathBuf>>>,
Expand Down
40 changes: 39 additions & 1 deletion cli/tests/watcher_tests.rs
Expand Up @@ -1098,6 +1098,44 @@ mod watcher {
check_alive_then_kill(child);
}

// Regression test for https://github.com/denoland/deno/issues/15465.
#[test]
fn run_watch_reload_once() {
let _g = util::http_server();
let t = TempDir::new();
let file_to_watch = t.path().join("file_to_watch.js");
let file_content = r#"
import { time } from "http://localhost:4545/dynamic_module.ts";
console.log(time);
"#;
write(&file_to_watch, file_content).unwrap();

let mut child = util::deno_cmd()
.current_dir(util::testdata_path())
.arg("run")
.arg("--watch")
.arg("--reload")
.arg(&file_to_watch)
.env("NO_COLOR", "1")
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let (mut stdout_lines, mut stderr_lines) = child_lines(&mut child);

wait_contains("finished", &mut stderr_lines);
let first_output = stdout_lines.next().unwrap();

write(&file_to_watch, file_content).unwrap();
// The remote dynamic module should not have been reloaded again.

wait_contains("finished", &mut stderr_lines);
let second_output = stdout_lines.next().unwrap();
assert_eq!(second_output, first_output);

check_alive_then_kill(child);
}

#[test]
fn run_watch_dynamic_imports() {
let t = TempDir::new();
Expand Down Expand Up @@ -1159,11 +1197,11 @@ mod watcher {
&mut stdout_lines,
);

wait_contains("finished", &mut stderr_lines);
wait_for(
|m| m.contains("Watching paths") && m.contains("imported2.js"),
&mut stderr_lines,
);
wait_contains("finished", &mut stderr_lines);

write(
&file_to_watch3,
Expand Down
12 changes: 8 additions & 4 deletions cli/tools/bench.rs
Expand Up @@ -32,6 +32,7 @@ use indexmap::IndexMap;
use log::Level;
use serde::Deserialize;
use serde::Serialize;
use std::cell::RefCell;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
Expand Down Expand Up @@ -339,7 +340,6 @@ async fn check_specifiers(
lib,
PermissionsContainer::allow_all(),
PermissionsContainer::new(permissions),
true,
)
.await?;

Expand Down Expand Up @@ -538,13 +538,15 @@ pub async fn run_benchmarks_with_watch(
.collect();
let no_check = ps.options.type_check_mode() == TypeCheckMode::None;

let ps = RefCell::new(ps);

let resolver = |changed: Option<Vec<PathBuf>>| {
let paths_to_watch = paths_to_watch.clone();
let paths_to_watch_clone = paths_to_watch.clone();

let files_changed = changed.is_some();
let files = bench_options.files.clone();
let ps = ps.clone();
let ps = ps.borrow().clone();

async move {
let bench_modules = collect_specifiers(&files, is_supported_bench_path)?;
Expand Down Expand Up @@ -656,7 +658,8 @@ pub async fn run_benchmarks_with_watch(

let operation = |modules_to_reload: Vec<(ModuleSpecifier, ModuleKind)>| {
let permissions = permissions.clone();
let ps = ps.clone();
ps.borrow_mut().reset_for_file_watcher();
let ps = ps.borrow().clone();
let filter = bench_options.filter.clone();
let files = bench_options.files.clone();

Expand All @@ -681,12 +684,13 @@ pub async fn run_benchmarks_with_watch(
}
};

let clear_screen = !ps.borrow().options.no_clear_screen();
file_watcher::watch_func(
resolver,
operation,
file_watcher::PrintConfig {
job_name: "Bench".to_string(),
clear_screen: !ps.options.no_clear_screen(),
clear_screen,
},
)
.await?;
Expand Down
16 changes: 6 additions & 10 deletions cli/tools/run.rs
@@ -1,7 +1,6 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.

use std::io::Read;
use std::path::PathBuf;
use std::sync::Arc;

use deno_ast::MediaType;
Expand Down Expand Up @@ -104,16 +103,13 @@ async fn run_with_watch(flags: Flags, script: String) -> Result<i32, AnyError> {
let flags = Arc::new(flags);
let main_module = resolve_url_or_path(&script)?;
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let mut ps =
ProcState::build_for_file_watcher((*flags).clone(), sender.clone()).await?;

let operation = |(sender, main_module): (
tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>,
ModuleSpecifier,
)| {
let flags = flags.clone();
let operation = |main_module: ModuleSpecifier| {
ps.reset_for_file_watcher();
let ps = ps.clone();
Ok(async move {
let ps =
ProcState::build_for_file_watcher((*flags).clone(), sender.clone())
.await?;
let permissions = PermissionsContainer::new(Permissions::from_options(
&ps.options.permissions_options(),
)?);
Expand All @@ -128,7 +124,7 @@ async fn run_with_watch(flags: Flags, script: String) -> Result<i32, AnyError> {
util::file_watcher::watch_func2(
receiver,
operation,
(sender, main_module),
main_module,
util::file_watcher::PrintConfig {
job_name: "Process".to_string(),
clear_screen: !flags.no_clear_screen,
Expand Down

0 comments on commit 69f77d4

Please sign in to comment.