Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LOG-3949: Vector not releasing deleted file handles #154

Merged
merged 1 commit into from Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile.unit
Expand Up @@ -29,7 +29,7 @@ COPY . /src

ENV PROTOC=/src/thirdparty/protoc/protoc-linux-x86_64

RUN chmod -R 777 /src $CARGO_HOME
RUN chmod -R 777 /src $CARGO_HOME $HOME
RUN mkdir -p ~/.cargo/bin && \
for plugin in nextest deny; do \
ln -s /src/thirdparty/cargo-${plugin}/cargo-${plugin}-linux-$(arch) ~/.cargo/bin/cargo-${plugin}; \
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -298,7 +298,7 @@ target/%/vector.tar.gz: target/%/vector CARGO_HANDLES_FRESHNESS
# https://github.com/rust-lang/cargo/issues/6454
.PHONY: test
test: ## Run the unit test suite
${MAYBE_ENVIRONMENT_EXEC} cargo nextest run -v --workspace --no-fail-fast --no-default-features --features "${FEATURES}" ${SCOPE}
${MAYBE_ENVIRONMENT_EXEC} cargo nextest run -v --workspace --no-fail-fast --no-default-features --features "${FEATURES}" --test-threads 1 ${SCOPE}

.PHONY: test-docs
test-docs: ## Run the docs test suite
Expand Down
8 changes: 8 additions & 0 deletions lib/file-source/src/file_server.rs
Expand Up @@ -52,6 +52,7 @@ where
pub remove_after: Option<Duration>,
pub emitter: E,
pub handle: tokio::runtime::Handle,
pub rotate_wait: Duration,
}

/// `FileServer` as Source
Expand Down Expand Up @@ -292,6 +293,13 @@ where
}
}

for (_, watcher) in &mut fp_map {
if !watcher.file_findable() && watcher.last_seen().elapsed() > self.rotate_wait {
self.emitter.emit_gave_up_on_deleted_file(&watcher.path);
watcher.set_dead();
}
}

// A FileWatcher is dead when the underlying file has disappeared.
// If the FileWatcher is dead we don't retain it; it will be deallocated.
fp_map.retain(|file_id, watcher| {
Expand Down
10 changes: 10 additions & 0 deletions lib/file-source/src/file_watcher/mod.rs
Expand Up @@ -44,6 +44,7 @@ pub struct FileWatcher {
is_dead: bool,
last_read_attempt: Instant,
last_read_success: Instant,
last_seen: Instant,
max_line_bytes: usize,
line_delimiter: Bytes,
buf: BytesMut,
Expand Down Expand Up @@ -145,6 +146,7 @@ impl FileWatcher {
is_dead: false,
last_read_attempt: ts,
last_read_success: ts,
last_seen: ts,
max_line_bytes,
line_delimiter,
buf: BytesMut::new(),
Expand Down Expand Up @@ -176,6 +178,9 @@ impl FileWatcher {

pub fn set_file_findable(&mut self, f: bool) {
self.findable = f;
if f {
self.last_seen = Instant::now();
}
}

pub fn file_findable(&self) -> bool {
Expand Down Expand Up @@ -268,6 +273,11 @@ impl FileWatcher {
self.last_read_success.elapsed() < Duration::from_secs(10)
|| self.last_read_attempt.elapsed() > Duration::from_secs(10)
}

#[inline]
pub fn last_seen(&self) -> Instant {
self.last_seen
}
}

fn is_gzipped(r: &mut io::BufReader<fs::File>) -> io::Result<bool> {
Expand Down
2 changes: 2 additions & 0 deletions lib/file-source/src/fingerprinter.rs
Expand Up @@ -554,5 +554,7 @@ mod test {
fn emit_files_open(&self, _: usize) {}

fn emit_path_globbing_failed(&self, _: &Path, _: &Error) {}

fn emit_gave_up_on_deleted_file(&self, _: &Path) {}
}
}
2 changes: 2 additions & 0 deletions lib/file-source/src/internal_events.rs
Expand Up @@ -26,4 +26,6 @@ pub trait FileSourceInternalEvents: Send + Sync + Clone + 'static {
fn emit_files_open(&self, count: usize);

fn emit_path_globbing_failed(&self, path: &Path, error: &Error);

fn emit_gave_up_on_deleted_file(&self, path: &Path);
}
23 changes: 23 additions & 0 deletions src/internal_events/file.rs
Expand Up @@ -404,6 +404,25 @@ mod source {
}
}

#[derive(Debug)]
pub struct GaveUpOnDeletedFile<'a> {
pub file: &'a Path,
}

impl<'a> InternalEvent for GaveUpOnDeletedFile<'a> {
fn emit(self) {
info!(
message = "Gave up on deleted file.",
file = %self.file.display(),
);
counter!(
"files_deleted_given_up_total", 1,
"file" => self.file.to_string_lossy().into_owned(),
);
}
}


#[derive(Clone)]
pub struct FileSourceInternalEventsEmitter;

Expand Down Expand Up @@ -458,5 +477,9 @@ mod source {
fn emit_path_globbing_failed(&self, path: &Path, error: &Error) {
emit!(PathGlobbingError { path, error });
}

fn emit_gave_up_on_deleted_file(&self, file: &Path) {
emit!(GaveUpOnDeletedFile { file });
}
}
}
12 changes: 12 additions & 0 deletions src/sources/file.rs
Expand Up @@ -234,6 +234,12 @@ pub struct FileConfig {
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,

/// How long to keep an open handle to a rotated log file.
#[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
#[configurable(metadata(docs::type_unit = "milliseconds"))]
#[serde(default = "default_rotate_wait_ms")]
pub rotate_wait_ms: Duration,
}

fn default_max_line_bytes() -> usize {
Expand Down Expand Up @@ -268,6 +274,10 @@ fn default_line_delimiter() -> String {
"\n".to_string()
}

const fn default_rotate_wait_ms() -> Duration {
Duration::from_millis(u64::MAX/1000)
}

/// Configuration for how files should be identified.
///
/// This is important for `checkpointing` when file rotation is used.
Expand Down Expand Up @@ -385,6 +395,7 @@ impl Default for FileConfig {
encoding: None,
acknowledgements: Default::default(),
log_namespace: None,
rotate_wait_ms: default_rotate_wait_ms()
}
}
}
Expand Down Expand Up @@ -532,6 +543,7 @@ pub fn file_source(
remove_after: config.remove_after_secs.map(Duration::from_secs),
emitter: FileSourceInternalEventsEmitter,
handle: tokio::runtime::Handle::current(),
rotate_wait: config.rotate_wait_ms
};

let event_metadata = EventMetadata {
Expand Down
15 changes: 15 additions & 0 deletions src/sources/kubernetes_logs/mod.rs
Expand Up @@ -232,6 +232,12 @@ pub struct Config {
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,

/// How long to keep an open handle to a rotated log file.
#[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
#[configurable(metadata(docs::type_unit = "milliseconds"))]
#[serde(default = "default_rotate_wait_ms")]
rotate_wait_ms: Duration
}

const fn default_read_from() -> ReadFromConfig {
Expand Down Expand Up @@ -273,6 +279,7 @@ impl Default for Config {
kube_config_file: None,
delay_deletion_ms: default_delay_deletion_ms(),
log_namespace: None,
rotate_wait_ms: default_rotate_wait_ms()
}
}
}
Expand Down Expand Up @@ -519,6 +526,7 @@ struct Source {
glob_minimum_cooldown: Duration,
ingestion_timestamp_field: Option<OwnedTargetPath>,
delay_deletion: Duration,
rotate_wait: Duration
}

impl Source {
Expand Down Expand Up @@ -595,6 +603,7 @@ impl Source {
glob_minimum_cooldown,
ingestion_timestamp_field,
delay_deletion,
rotate_wait: config.rotate_wait_ms
})
}

Expand Down Expand Up @@ -625,6 +634,7 @@ impl Source {
glob_minimum_cooldown,
ingestion_timestamp_field,
delay_deletion,
rotate_wait
} = self;

let mut reflectors = Vec::new();
Expand Down Expand Up @@ -755,6 +765,7 @@ impl Source {
emitter: FileSourceInternalEventsEmitter,
// A handle to the current tokio runtime
handle: tokio::runtime::Handle::current(),
rotate_wait
};

let (file_source_tx, file_source_rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
Expand Down Expand Up @@ -951,6 +962,10 @@ const fn default_delay_deletion_ms() -> Duration {
Duration::from_millis(60_000)
}

const fn default_rotate_wait_ms() -> Duration {
Duration::from_millis(u64::MAX/1000)
}

// This function constructs the patterns we exclude from file watching, created
// from the defaults or user provided configuration.
fn prepare_exclude_paths(config: &Config) -> crate::Result<Vec<glob::Pattern>> {
Expand Down