Skip to content

Commit

Permalink
Merge branch 'main' into ir_call_procedure
Browse files Browse the repository at this point in the history
  • Loading branch information
shirly121 committed Jul 5, 2023
2 parents c89c41e + ac7956e commit c4ff771
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 68 deletions.
44 changes: 21 additions & 23 deletions interactive_engine/executor/store/groot/src/db/graph/types/edge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,15 @@ pub struct EdgeTypeManager {
inner: Atomic<EdgeManagerInner>,
}

// https://docs.rs/crossbeam-epoch/0.7.2/crossbeam_epoch/struct.Atomic.html#method.into_owned
impl Drop for EdgeTypeManager {
fn drop(&mut self) {
unsafe {
drop(std::mem::replace(&mut self.inner, Atomic::null()).into_owned());
}
}
}

impl EdgeTypeManager {
pub fn new() -> Self {
EdgeTypeManager { inner: Atomic::new(EdgeManagerInner::new()) }
Expand All @@ -222,8 +231,8 @@ impl EdgeTypeManager {
}

pub fn get_edge_info(&self, si: SnapshotId, label: LabelId) -> GraphResult<Arc<EdgeInfo>> {
let guard = epoch::pin();
let inner = self.get_inner(&guard);
let guard = &epoch::pin();
let inner = self.get_inner(guard);
let ret = res_unwrap!(inner.get_edge_info(si, label), get_edge, si, label)?;
Ok(ret)
}
Expand All @@ -237,14 +246,14 @@ impl EdgeTypeManager {
}

pub fn contains_edge(&self, label: LabelId) -> bool {
let guard = epoch::pin();
let inner = self.get_inner(&guard);
let guard = &epoch::pin();
let inner = self.get_inner(guard);
inner.contains_edge(label)
}

pub fn contains_edge_kind(&self, si: SnapshotId, kind: &EdgeKind) -> bool {
let guard = epoch::pin();
let inner = self.get_inner(&guard);
let guard = &epoch::pin();
let inner = self.get_inner(guard);
inner.contains_edge_kind(si, kind)
}

Expand Down Expand Up @@ -272,24 +281,13 @@ impl EdgeTypeManager {

fn modify<E, F: Fn(&mut EdgeManagerInner) -> E>(&self, f: F) -> E {
let guard = &epoch::pin();
let mut inner_clone = unsafe {
self.inner
.load(Ordering::Relaxed, guard)
.deref()
.clone()
};
let inner = self.inner.load(Ordering::Relaxed, guard);
let mut inner_clone = unsafe { inner.deref() }.clone();
let res = f(&mut inner_clone);
let p = self
.inner
.swap(Owned::new(inner_clone).into_shared(guard), Ordering::Relaxed, guard);
if !p.is_null() {
unsafe {
// guard.defer_destroy(p);
guard.defer_unchecked(move || {
trace!("EdgeManagerInner is now being deallocated.");
drop(p.into_owned());
});
}
self.inner
.store(Owned::new(inner_clone).into_shared(guard), Ordering::Relaxed);
unsafe {
guard.defer_destroy(inner);
}
res
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,40 +129,46 @@ pub struct VertexTypeManager {
map: Atomic<VertexMap>,
}

impl Drop for VertexTypeManager {
fn drop(&mut self) {
unsafe {
drop(std::mem::replace(&mut self.map, Atomic::null()).into_owned());
}
}
}

impl VertexTypeManager {
pub fn new() -> Self {
VertexTypeManager { map: Atomic::new(VertexMap::new()) }
}

pub fn contains_type(&self, _si: SnapshotId, label: LabelId) -> bool {
let guard = epoch::pin();
let map = self.get_map(&guard);
let guard = &epoch::pin();
let map = self.get_map(guard);
map.contains_key(&label)
}

pub fn create_type(
&self, si: SnapshotId, label: LabelId, codec: Codec, table0: Table,
) -> GraphResult<()> {
assert_eq!(si, table0.start_si, "type start si must be equal to table0.start_si");
unsafe {
let guard = epoch::pin();
let map = self.get_shared_map(&guard);
let map_ref = map.as_ref().unwrap();
if map_ref.contains_key(&label) {
let msg = format!("vertex#{} already exists", label);
let err = gen_graph_err!(GraphErrorCode::InvalidOperation, msg, create_type);
return Err(err);
}
let info = VertexTypeInfo::new(si, label);
res_unwrap!(info.update_codec(si, codec), create_type)?;
res_unwrap!(info.online_table(table0), create_type)?;
let mut map_clone = map_ref.clone();
map_clone.insert(label, Arc::new(info));
self.map
.store(Owned::new(map_clone), Ordering::Relaxed);
guard.defer_destroy(map);
Ok(())

let guard = &epoch::pin();
let map = self.get_shared_map(guard);
let mut map_clone = unsafe { map.deref() }.clone();
if map_clone.contains_key(&label) {
let msg = format!("vertex#{} already exists", label);
let err = gen_graph_err!(GraphErrorCode::InvalidOperation, msg, create_type);
return Err(err);
}
let info = VertexTypeInfo::new(si, label);
res_unwrap!(info.update_codec(si, codec), create_type)?;
res_unwrap!(info.online_table(table0), create_type)?;
map_clone.insert(label, Arc::new(info));
self.map
.store(Owned::new(map_clone).into_shared(guard), Ordering::Relaxed);
unsafe { guard.defer_destroy(map) };
Ok(())
}

pub fn get_type(&self, si: SnapshotId, label: LabelId) -> GraphResult<VertexTypeInfoRef> {
Expand All @@ -183,8 +189,8 @@ impl VertexTypeManager {
}

pub fn get_type_info(&self, si: SnapshotId, label: LabelId) -> GraphResult<Arc<VertexTypeInfo>> {
let guard = epoch::pin();
let map = self.get_map(&guard);
let guard = &epoch::pin();
let map = self.get_map(guard);
if let Some(info) = map.get(&label) {
if info.is_alive_at(si) {
let ret = info.clone();
Expand All @@ -206,38 +212,36 @@ impl VertexTypeManager {
}

pub fn drop_type(&self, si: SnapshotId, label: LabelId) -> GraphResult<()> {
let guard = epoch::pin();
let map = self.get_map(&guard);
let guard = &epoch::pin();
let map = self.get_map(guard);
if let Some(info) = map.get(&label) {
info.lifetime.set_end(si);
}
Ok(())
}

pub fn gc(&self, si: SnapshotId) -> GraphResult<Vec<TableId>> {
unsafe {
let guard = epoch::pin();
let map = self.get_shared_map(&guard);
let map_ref: &VertexMap = map.deref();
let mut b = Vec::new();
let mut table_ids = Vec::new();
for (label, info) in map_ref {
table_ids.append(&mut info.gc(si)?);
if info.is_obsolete_at(si) {
b.push(*label);
}
let guard = &epoch::pin();
let map = self.get_shared_map(guard);
let map_ref: &VertexMap = unsafe { map.deref() };
let mut b = Vec::new();
let mut table_ids = Vec::new();
for (label, info) in map_ref {
table_ids.append(&mut info.gc(si)?);
if info.is_obsolete_at(si) {
b.push(*label);
}
if !b.is_empty() {
let mut map_clone = map_ref.clone();
for label in b {
map_clone.remove(&label);
}
self.map
.store(Owned::new(map_clone), Ordering::Relaxed);
guard.defer_destroy(map);
}
if !b.is_empty() {
let mut map_clone = map_ref.clone();
for label in b {
map_clone.remove(&label);
}
Ok(table_ids)
self.map
.store(Owned::new(map_clone).into_shared(guard), Ordering::Relaxed);
unsafe { guard.defer_destroy(map) };
}
Ok(table_ids)
}

fn get_map(&self, guard: &Guard) -> &'static VertexMap {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ impl VersionManager {
break;
}
}
self.head.store(head, Ordering::Relaxed);
if !ret.is_empty() {
self.head.store(head, Ordering::Relaxed);
}
Ok(ret)
}

Expand Down

0 comments on commit c4ff771

Please sign in to comment.