From 4d9f27c6b065a976240ca8beedfa50be830bdbf1 Mon Sep 17 00:00:00 2001 From: f1shl3gs Date: Sat, 8 Jun 2024 04:57:33 +0800 Subject: [PATCH] remove notify (#1777) --- Cargo.lock | 71 ------ lib/framework/Cargo.toml | 2 - lib/framework/src/config/watcher.rs | 334 +++++++++++++++++++++------- src/launch.rs | 2 +- 4 files changed, 251 insertions(+), 158 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dcbf7ef9d..25ffc08ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -956,18 +956,6 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" -[[package]] -name = "filetime" -version = "0.2.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall 0.4.1", - "windows-sys 0.52.0", -] - [[package]] name = "finalize" version = "0.1.0" @@ -1061,7 +1049,6 @@ dependencies = [ "log_schema", "memchr", "metrics", - "notify", "once_cell", "pem", "percent-encoding", @@ -1730,26 +1717,6 @@ dependencies = [ "str_stack", ] -[[package]] -name = "inotify" -version = "0.9.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" -dependencies = [ - "bitflags 1.3.2", - "inotify-sys", - "libc", -] - -[[package]] -name = "inotify-sys" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" -dependencies = [ - "libc", -] - [[package]] name = "inout" version = "0.1.3" @@ -1910,26 +1877,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "kqueue" -version = "1.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" -dependencies = [ - "kqueue-sys", - "libc", -] - -[[package]] -name = "kqueue-sys" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" -dependencies = [ - "bitflags 1.3.2", - "libc", -] - [[package]] name = "kube" version = "0.91.0" @@ -2235,7 +2182,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", - "log", "wasi", "windows-sys 0.48.0", ] @@ -2283,23 +2229,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "notify" -version = "6.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6205bd8bb1e454ad2e27422015fb5e4f2bcc7e08fa8f27058670d208324a4d2d" -dependencies = [ - "bitflags 2.5.0", - "filetime", - "inotify", - "kqueue", - "libc", - "log", - "mio", - "walkdir", - "windows-sys 0.48.0", -] - [[package]] name = "ntp" version = "0.1.0" diff --git a/lib/framework/Cargo.toml b/lib/framework/Cargo.toml index 0b5f60c3c..aca498837 100644 --- a/lib/framework/Cargo.toml +++ b/lib/framework/Cargo.toml @@ -57,8 +57,6 @@ log_schema = { path = "../log_schema" } bytesize = { path = "../bytesize" } memchr = { version = "2.7.2", default-features = false } metrics = { path = "../metrics" } -# wired feature requirement, see: https://docs.rs/notify/latest/notify/#crossbeam-channel--tokio -notify = { version = "6.1.1", default-features = false, features = ["macos_kqueue"] } once_cell = { version = "1.19.0", default-features = false } pem = { version = "3.0.4", default-features = false, features = ["std"] } percent-encoding = { version = "2.3.1", default-features = false } diff --git a/lib/framework/src/config/watcher.rs b/lib/framework/src/config/watcher.rs index 88c64fd0d..a5e76f0e8 100644 --- a/lib/framework/src/config/watcher.rs +++ b/lib/framework/src/config/watcher.rs @@ -1,12 +1,224 @@ use std::path::PathBuf; -use std::sync::mpsc::{channel, Receiver}; -use std::thread; use std::time::Duration; -use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; +use futures_util::StreamExt; use crate::Error; +mod inotify { + use std::ffi::{c_void, CString, OsStr}; + use std::io; + use std::os::fd::{AsRawFd, RawFd}; + use std::os::unix::ffi::OsStrExt; + use std::path::Path; + use std::pin::Pin; + use std::task::{ready, Context, Poll}; + + use futures::Stream; + use tokio::io::unix::AsyncFd; + + // the size of inotify_event + const EVENT_SIZE: usize = 16; + + pub struct Watcher { + fd: AsyncFd, + wds: Vec, + } + + impl Watcher { + pub fn new() -> io::Result { + let fd = unsafe { + let ret = libc::inotify_init1(libc::IN_CLOEXEC | libc::IN_NONBLOCK); + if ret == -1 { + return Err(io::Error::last_os_error()); + } + + ret + }; + + Ok(Watcher { + fd: AsyncFd::new(fd)?, + wds: vec![], + }) + } + + pub fn add(&mut self, path: impl AsRef) -> io::Result<()> { + let path = CString::new(path.as_ref().as_os_str().as_bytes())?; + + let wd = unsafe { + let ret = libc::inotify_add_watch( + self.fd.as_raw_fd(), + path.as_ptr() as *const _, + libc::IN_CLOSE_WRITE + | libc::IN_MOVE + | libc::IN_MOVED_TO + | libc::IN_MODIFY + | libc::IN_CREATE, + ); + if ret == -1 { + return Err(io::Error::last_os_error()); + } + + ret + }; + + self.wds.push(wd); + + Ok(()) + } + + pub fn into_stream(self, buf: &[u8]) -> EventStream { + EventStream { + fd: self.fd, + wds: self.wds, + buf, + } + } + } + + #[allow(dead_code)] + pub struct EventStream<'a> { + fd: AsyncFd, + wds: Vec, + buf: &'a [u8], + } + + impl<'a> Stream for EventStream<'a> { + type Item = io::Result>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + let mut guard = ready!(self.fd.poll_read_ready(cx))?; + + #[allow(clippy::blocks_in_conditions)] + match guard.try_io(|inner| { + let ret = unsafe { + libc::read( + inner.as_raw_fd(), + self.buf.as_ptr() as *mut c_void, + self.buf.len(), + ) + }; + if ret == -1 { + return Err(io::Error::last_os_error()); + } + + Ok(ret as usize) + }) { + Ok(Ok(len)) => { + return Poll::Ready(Some(Ok(Events { + buf: self.buf, + pos: 0, + len, + }))) + } + Ok(Err(err)) => return Poll::Ready(Some(Err(err))), + Err(_would_block) => continue, + } + } + } + } + + /// An inotify event + /// + /// A file system event that describes a change that the user previously + /// registered interest in. To watch for events. + pub struct Event { + /// Identifies the watch this event originates from. + pub wd: i32, + + /// Indicates what kind of event this is + pub mask: u32, + + /// Connects related events to each other + /// + /// When a file is renamed, this results two events: [`MOVED_FROM`] and + /// [`MOVED_TO`]. The `cookie` field will be the same for both of them, + /// thereby making is possible to connect the event pair. + pub cookie: u32, + + /// The name of the file the event originates from + /// + /// This field is set only if the subject of the event is a file or directory + /// in watched directory. If the event concerns a file or directory that is + /// watched directly, `name` will be `None`. + pub name: Option, + } + + /// Iterator over inotify events. + #[derive(Debug)] + pub struct Events<'a> { + buf: &'a [u8], + pos: usize, + len: usize, + } + + impl<'a> Iterator for Events<'a> { + type Item = Event<&'a OsStr>; + + fn next(&mut self) -> Option { + if self.pos < self.len { + let ev = unsafe { + let ptr = self.buf.as_ptr().add(self.pos) as *const libc::inotify_event; + ptr.read_unaligned() + }; + + let name = if ev.len == 0 { + None + } else { + let name = + &self.buf[self.pos + EVENT_SIZE..self.pos + EVENT_SIZE + ev.len as usize]; + let name = name.splitn(2, |b| b == &0u8).next().unwrap(); + + Some(OsStr::from_bytes(name)) + }; + + self.pos += EVENT_SIZE + ev.len as usize; + + Some(Event { + wd: ev.wd, + mask: ev.mask, + cookie: ev.cookie, + name, + }) + } else { + None + } + } + } + + #[cfg(test)] + mod tests { + use super::*; + use futures_util::StreamExt; + use std::fs::File; + use std::io::Write; + + use testify::temp_dir; + + #[tokio::test] + async fn write() { + let directory = temp_dir(); + let filepath = directory.join("test.txt"); + let mut file = File::create(&filepath).unwrap(); + + let mut watcher = Watcher::new().unwrap(); + watcher.add(directory).unwrap(); + let buf = [0u8; 4096]; + let mut stream = watcher.into_stream(&buf); + + file.write_all(&[0]).unwrap(); + file.sync_all().unwrap(); + + if let Some(Ok(_evs)) = stream.next().await { + // ok + } else { + panic!("change should detected"); + } + } + } +} + /// Per notify own documentation, it's advised to have delay of more than 30 sec, /// so to avoid receiving repetitions of previous events on MacOS. /// @@ -18,61 +230,49 @@ const DEFAULT_WATCH_DELAY: Duration = Duration::from_secs(1); const RETRY_TIMEOUT: Duration = Duration::from_secs(10); -pub fn spawn_thread<'a>( +pub fn watch_configs<'a>( config_paths: impl IntoIterator + 'a, - delay: impl Into>, ) -> Result<(), Error> { - let delay = delay.into().unwrap_or(DEFAULT_WATCH_DELAY); let config_paths: Vec<_> = config_paths.into_iter().cloned().collect(); - // Create watcher now so not to miss any changes happening between - // returning from this function and the thread starting. - let mut watcher = Some(create_watcher(&config_paths)?); + // first init + let mut watcher = inotify::Watcher::new()?; + config_paths.iter().try_for_each(|path| watcher.add(path))?; info!(message = "Watching configuration files"); - - thread::spawn(move || loop { - if let Some((mut watcher, receiver)) = watcher.take() { - while let Ok(event) = receiver.recv() { - if matches!( - event.kind, - EventKind::Create(_) | EventKind::Remove(_) | EventKind::Modify(_) - ) { - debug!(message = "Configuration file change detected", ?event); - - // Consume events until delay amount of time has passed since the latest event. - while receiver.recv_timeout(delay).is_ok() {} - - // We need to read paths to resolve any inode changes that may have happened. - // And we need to do it before raising sighup to avoid missing any change. - if let Err(err) = add_paths(&mut watcher, &config_paths) { - error!(message = "Failed to add files to watch", ?err); - break; + tokio::spawn(async move { + let buf = [0u8; 4096]; + let mut stream = watcher.into_stream(&buf); + + loop { + match stream.next().await { + Some(res) => match res { + Ok(_events) => { + info!(message = "Configuration file changed."); + raise_sighup(); } + Err(err) => { + error!(message = "read inotify failed, retrying watch", ?err); - info!(message = "Configuration file changed."); - raise_sighup(); - } else { - debug!(message = "Ignoring event", ?event); - } - } - } + // error occurs, sleep a while and retry + tokio::time::sleep(RETRY_TIMEOUT).await; - thread::sleep(RETRY_TIMEOUT); + drop(stream); - watcher = create_watcher(&config_paths) - .map_err(|err| { - error!(message = "Failed to create file watcher", ?err); - }) - .ok(); + let mut watcher = inotify::Watcher::new()?; + config_paths.iter().try_for_each(|path| watcher.add(path))?; + stream = watcher.into_stream(&buf); - if watcher.is_some() { - // Config files could have changed while we weren't watching, - // so for a good measure raise SIGHUP and let reload logic - // determine if anything changed. - info!(message = "Speculating that configuration files have changed",); + continue; + } + }, + None => { + // this shall never happen + return Ok::<(), Error>(()); + } + } - raise_sighup(); + tokio::time::sleep(DEFAULT_WATCH_DELAY).await; } }); @@ -90,40 +290,6 @@ fn raise_sighup() { } } -fn create_watcher( - config_paths: &[PathBuf], -) -> Result<(RecommendedWatcher, Receiver), Error> { - info!(message = "Creating configuration file watcher"); - - let (sender, receiver) = channel(); - let mut watcher = RecommendedWatcher::new( - move |result| match result { - Ok(event) => { - if let Err(err) = sender.send(event) { - warn!(message = "send notify event failed", ?err); - } - } - - Err(err) => { - error!(message = "receive notify event failed", ?err); - } - }, - Config::default(), - )?; - - add_paths(&mut watcher, config_paths)?; - - Ok((watcher, receiver)) -} - -fn add_paths(watcher: &mut RecommendedWatcher, config_paths: &[PathBuf]) -> Result<(), Error> { - config_paths.iter().try_for_each(|path| { - watcher - .watch(path, RecursiveMode::NonRecursive) - .map_err(Into::into) - }) -} - #[cfg(all(test, unix, not(target_os = "macos")))] mod tests { use std::fs::File; @@ -145,12 +311,12 @@ mod tests { #[tokio::test] async fn file_directory_update() { - let delay = Duration::from_secs(3); + let delay = Duration::from_secs(2); let directory = temp_dir(); let filepath = directory.join("test.txt"); let mut file = File::create(&filepath).unwrap(); - spawn_thread(&[directory], delay).unwrap(); + watch_configs(&[directory]).unwrap(); assert!(test(&mut file, delay * 5).await) } @@ -161,7 +327,7 @@ mod tests { let filepath = temp_file(); let mut file = File::create(&filepath).unwrap(); - spawn_thread(&[filepath], delay).unwrap(); + watch_configs(&[filepath]).unwrap(); assert!(test(&mut file, delay * 5).await) } @@ -174,7 +340,7 @@ mod tests { let mut file = File::create(&filepath).unwrap(); std::os::unix::fs::symlink(&filepath, &sym_file).unwrap(); - spawn_thread(&[sym_file], delay).unwrap(); + watch_configs(&[filepath]).unwrap(); assert!(test(&mut file, delay * 5).await); } diff --git a/src/launch.rs b/src/launch.rs index 7340e80cb..6c8ac22a8 100644 --- a/src/launch.rs +++ b/src/launch.rs @@ -199,7 +199,7 @@ impl RootCommand { #[cfg(all(unix, not(target_os = "macos")))] if watch_config { // Start listening for config changes immediately. - config::watcher::spawn_thread(config_paths.iter().map(Into::into), None) + config::watcher::watch_configs(config_paths.iter().map(Into::into)) .map_err(|err| { error!( message = "Unable to start config watcher",