Skip to content

Commit

Permalink
fix(cubestore): FreeInMemoryChunks fires if there is no inmemory chun…
Browse files Browse the repository at this point in the history
…ks (#6771)
  • Loading branch information
waralexrom committed Jun 28, 2023
1 parent f3b4d57 commit f9cfd77
Showing 1 changed file with 10 additions and 28 deletions.
38 changes: 10 additions & 28 deletions rust/cubestore/cubestore/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,20 @@ crate::di_service!(SchedulerImpl, []);

struct LastNodeActionTimes {
in_memory_compaction: SystemTime,
deleted_chunks_release: SystemTime,
}

impl LastNodeActionTimes {
pub fn new() -> Self {
Self {
in_memory_compaction: SystemTime::now(),
deleted_chunks_release: SystemTime::now(),
}
}
pub fn in_memory_compaction(&self) -> SystemTime {
self.in_memory_compaction.clone()
}
pub fn deleted_chunks_release(&self) -> SystemTime {
self.deleted_chunks_release.clone()
}
pub fn set_in_memory_compaction(&mut self, time: SystemTime) {
self.in_memory_compaction = time;
}
pub fn set_deleted_chunks_release(&mut self, time: SystemTime) {
self.deleted_chunks_release = time;
}
}

impl SchedulerImpl {
Expand Down Expand Up @@ -758,7 +750,16 @@ impl SchedulerImpl {
tokio::fs::remove_file(file).await?;
}
if let MetaStoreEvent::DeleteChunk(chunk) = &event {
if !chunk.get_row().in_memory() && chunk.get_row().uploaded() {
if chunk.get_row().in_memory() {
let partition = self
.meta_store
.get_partition(chunk.get_row().get_partition_id())
.await?;
let node_name = self.cluster.node_name_by_partition(&partition);
self.cluster
.free_memory_chunk(&node_name, chunk.get_id())
.await?;
} else if chunk.get_row().uploaded() {
let file_name =
ChunkStore::chunk_remote_path(chunk.get_id(), chunk.get_row().suffix());
let deadline = Instant::now()
Expand Down Expand Up @@ -898,25 +899,6 @@ impl SchedulerImpl {
self.process_active_chunks(active_chunks).await?;
self.process_inactive_chunks(inactive_chunks).await?;
}
let mut node_last_actions = self.node_last_actions.lock().await;
for (node, last_action) in node_last_actions.iter_mut() {
if last_action
.deleted_chunks_release()
.elapsed()
.ok()
.map_or(false, |d| d >= Duration::from_secs(2))
{
if let Err(e) = self.cluster.free_deleted_memory_chunks(&node).await {
log::error!(
"Error while trying release in memory chunks in node {}: {}",
node,
e
);
} else {
last_action.set_deleted_chunks_release(SystemTime::now());
}
}
}

Ok(())
}
Expand Down

0 comments on commit f9cfd77

Please sign in to comment.