From ac7956e53f63bbe35b736950fd28fb63387a78be Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Wed, 5 Jul 2023 19:09:39 +0800 Subject: [PATCH] Refine GC related codes in groot (#2973) Further refinement of #2971 --- .../store/groot/src/db/graph/types/edge.rs | 44 +++++---- .../store/groot/src/db/graph/types/vertex.rs | 92 ++++++++++--------- .../store/groot/src/db/graph/version.rs | 4 +- 3 files changed, 72 insertions(+), 68 deletions(-) diff --git a/interactive_engine/executor/store/groot/src/db/graph/types/edge.rs b/interactive_engine/executor/store/groot/src/db/graph/types/edge.rs index f7cb2b6a6f65..b5ba3e7b77e1 100644 --- a/interactive_engine/executor/store/groot/src/db/graph/types/edge.rs +++ b/interactive_engine/executor/store/groot/src/db/graph/types/edge.rs @@ -200,6 +200,15 @@ pub struct EdgeTypeManager { inner: Atomic, } +// https://docs.rs/crossbeam-epoch/0.7.2/crossbeam_epoch/struct.Atomic.html#method.into_owned +impl Drop for EdgeTypeManager { + fn drop(&mut self) { + unsafe { + drop(std::mem::replace(&mut self.inner, Atomic::null()).into_owned()); + } + } +} + impl EdgeTypeManager { pub fn new() -> Self { EdgeTypeManager { inner: Atomic::new(EdgeManagerInner::new()) } @@ -222,8 +231,8 @@ impl EdgeTypeManager { } pub fn get_edge_info(&self, si: SnapshotId, label: LabelId) -> GraphResult> { - let guard = epoch::pin(); - let inner = self.get_inner(&guard); + let guard = &epoch::pin(); + let inner = self.get_inner(guard); let ret = res_unwrap!(inner.get_edge_info(si, label), get_edge, si, label)?; Ok(ret) } @@ -237,14 +246,14 @@ impl EdgeTypeManager { } pub fn contains_edge(&self, label: LabelId) -> bool { - let guard = epoch::pin(); - let inner = self.get_inner(&guard); + let guard = &epoch::pin(); + let inner = self.get_inner(guard); inner.contains_edge(label) } pub fn contains_edge_kind(&self, si: SnapshotId, kind: &EdgeKind) -> bool { - let guard = epoch::pin(); - let inner = self.get_inner(&guard); + let guard = &epoch::pin(); + let inner = self.get_inner(guard); inner.contains_edge_kind(si, kind) } @@ -272,24 +281,13 @@ impl EdgeTypeManager { fn modify E>(&self, f: F) -> E { let guard = &epoch::pin(); - let mut inner_clone = unsafe { - self.inner - .load(Ordering::Relaxed, guard) - .deref() - .clone() - }; + let inner = self.inner.load(Ordering::Relaxed, guard); + let mut inner_clone = unsafe { inner.deref() }.clone(); let res = f(&mut inner_clone); - let p = self - .inner - .swap(Owned::new(inner_clone).into_shared(guard), Ordering::Relaxed, guard); - if !p.is_null() { - unsafe { - // guard.defer_destroy(p); - guard.defer_unchecked(move || { - trace!("EdgeManagerInner is now being deallocated."); - drop(p.into_owned()); - }); - } + self.inner + .store(Owned::new(inner_clone).into_shared(guard), Ordering::Relaxed); + unsafe { + guard.defer_destroy(inner); } res } diff --git a/interactive_engine/executor/store/groot/src/db/graph/types/vertex.rs b/interactive_engine/executor/store/groot/src/db/graph/types/vertex.rs index 271126f5598a..2d52f7485ba3 100644 --- a/interactive_engine/executor/store/groot/src/db/graph/types/vertex.rs +++ b/interactive_engine/executor/store/groot/src/db/graph/types/vertex.rs @@ -129,14 +129,22 @@ pub struct VertexTypeManager { map: Atomic, } +impl Drop for VertexTypeManager { + fn drop(&mut self) { + unsafe { + drop(std::mem::replace(&mut self.map, Atomic::null()).into_owned()); + } + } +} + impl VertexTypeManager { pub fn new() -> Self { VertexTypeManager { map: Atomic::new(VertexMap::new()) } } pub fn contains_type(&self, _si: SnapshotId, label: LabelId) -> bool { - let guard = epoch::pin(); - let map = self.get_map(&guard); + let guard = &epoch::pin(); + let map = self.get_map(guard); map.contains_key(&label) } @@ -144,25 +152,23 @@ impl VertexTypeManager { &self, si: SnapshotId, label: LabelId, codec: Codec, table0: Table, ) -> GraphResult<()> { assert_eq!(si, table0.start_si, "type start si must be equal to table0.start_si"); - unsafe { - let guard = epoch::pin(); - let map = self.get_shared_map(&guard); - let map_ref = map.as_ref().unwrap(); - if map_ref.contains_key(&label) { - let msg = format!("vertex#{} already exists", label); - let err = gen_graph_err!(GraphErrorCode::InvalidOperation, msg, create_type); - return Err(err); - } - let info = VertexTypeInfo::new(si, label); - res_unwrap!(info.update_codec(si, codec), create_type)?; - res_unwrap!(info.online_table(table0), create_type)?; - let mut map_clone = map_ref.clone(); - map_clone.insert(label, Arc::new(info)); - self.map - .store(Owned::new(map_clone), Ordering::Relaxed); - guard.defer_destroy(map); - Ok(()) + + let guard = &epoch::pin(); + let map = self.get_shared_map(guard); + let mut map_clone = unsafe { map.deref() }.clone(); + if map_clone.contains_key(&label) { + let msg = format!("vertex#{} already exists", label); + let err = gen_graph_err!(GraphErrorCode::InvalidOperation, msg, create_type); + return Err(err); } + let info = VertexTypeInfo::new(si, label); + res_unwrap!(info.update_codec(si, codec), create_type)?; + res_unwrap!(info.online_table(table0), create_type)?; + map_clone.insert(label, Arc::new(info)); + self.map + .store(Owned::new(map_clone).into_shared(guard), Ordering::Relaxed); + unsafe { guard.defer_destroy(map) }; + Ok(()) } pub fn get_type(&self, si: SnapshotId, label: LabelId) -> GraphResult { @@ -183,8 +189,8 @@ impl VertexTypeManager { } pub fn get_type_info(&self, si: SnapshotId, label: LabelId) -> GraphResult> { - let guard = epoch::pin(); - let map = self.get_map(&guard); + let guard = &epoch::pin(); + let map = self.get_map(guard); if let Some(info) = map.get(&label) { if info.is_alive_at(si) { let ret = info.clone(); @@ -206,8 +212,8 @@ impl VertexTypeManager { } pub fn drop_type(&self, si: SnapshotId, label: LabelId) -> GraphResult<()> { - let guard = epoch::pin(); - let map = self.get_map(&guard); + let guard = &epoch::pin(); + let map = self.get_map(guard); if let Some(info) = map.get(&label) { info.lifetime.set_end(si); } @@ -215,29 +221,27 @@ impl VertexTypeManager { } pub fn gc(&self, si: SnapshotId) -> GraphResult> { - unsafe { - let guard = epoch::pin(); - let map = self.get_shared_map(&guard); - let map_ref: &VertexMap = map.deref(); - let mut b = Vec::new(); - let mut table_ids = Vec::new(); - for (label, info) in map_ref { - table_ids.append(&mut info.gc(si)?); - if info.is_obsolete_at(si) { - b.push(*label); - } + let guard = &epoch::pin(); + let map = self.get_shared_map(guard); + let map_ref: &VertexMap = unsafe { map.deref() }; + let mut b = Vec::new(); + let mut table_ids = Vec::new(); + for (label, info) in map_ref { + table_ids.append(&mut info.gc(si)?); + if info.is_obsolete_at(si) { + b.push(*label); } - if !b.is_empty() { - let mut map_clone = map_ref.clone(); - for label in b { - map_clone.remove(&label); - } - self.map - .store(Owned::new(map_clone), Ordering::Relaxed); - guard.defer_destroy(map); + } + if !b.is_empty() { + let mut map_clone = map_ref.clone(); + for label in b { + map_clone.remove(&label); } - Ok(table_ids) + self.map + .store(Owned::new(map_clone).into_shared(guard), Ordering::Relaxed); + unsafe { guard.defer_destroy(map) }; } + Ok(table_ids) } fn get_map(&self, guard: &Guard) -> &'static VertexMap { diff --git a/interactive_engine/executor/store/groot/src/db/graph/version.rs b/interactive_engine/executor/store/groot/src/db/graph/version.rs index 9b79d769de23..3124a615568e 100644 --- a/interactive_engine/executor/store/groot/src/db/graph/version.rs +++ b/interactive_engine/executor/store/groot/src/db/graph/version.rs @@ -115,7 +115,9 @@ impl VersionManager { break; } } - self.head.store(head, Ordering::Relaxed); + if !ret.is_empty() { + self.head.store(head, Ordering::Relaxed); + } Ok(ret) }