Skip to content

Commit

Permalink
[Bug Fix] Fix bugs in CI of GAIA on Vineyard (#652)
Browse files Browse the repository at this point in the history
* [Bug Fix] fix bug when query is not processed by dataflow
* [Bug Fix] impl get_property and get_properties for IdOnlyVertex in vineyard
  • Loading branch information
BingqingLyu committed Aug 4, 2021
1 parent 4fe934a commit 36d7bc4
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,9 @@ void get_edge_properties(FRAGMENT_TYPE* frag, LabelId label, int64_t offset,
}

int properties_next(PropertiesIteratorImpl* iter, Property* p_out) {
if (iter==nullptr) {
return -1;
}
while (iter->col_id < iter->col_num &&
iter->table->field(iter->col_id)->type() == arrow::null()) {
++iter->col_id;
Expand Down
10 changes: 8 additions & 2 deletions interactive_engine/src/executor/runtime/src/store/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,11 +569,11 @@ impl Vertex for IdOnlyVertex {
}

fn get_property(&self, prop_id: u32) -> Option<Property> {
unimplemented!()
None
}

fn get_properties(&self) -> Self::PI {
unimplemented!()
FFIPropertiesIter::empty()
}
}

Expand Down Expand Up @@ -871,6 +871,12 @@ impl FFIPropertiesIter {
iter,
}
}

pub fn empty() -> Self {
FFIPropertiesIter {
iter: std::ptr::null(),
}
}
}

impl Drop for FFIPropertiesIter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,15 @@ impl Partitioner for VineyardMultiPartition {

fn get_worker_partitions(
&self,
_job_workers: usize,
job_workers: usize,
worker_id: u32,
) -> DynResult<Option<Vec<u64>>> {
// If only one worker each server, it will process all partitions
if job_workers == 1 {
Ok(Some(self.graph_partition_manager.get_process_partition_list().into_iter().map(|pid| pid as u64).collect()))
}
// Vineyard will pre-allocate the worker_partition_list mapping
if let Ok(worker_partition_list_mapping) = self.worker_partition_list_mapping.read() {
else if let Ok(worker_partition_list_mapping) = self.worker_partition_list_mapping.read() {
if let Some(worker_partition_list_mapping) = worker_partition_list_mapping.as_ref() {
if let Some(partition_list) = worker_partition_list_mapping.get(&worker_id) {
Ok(Some(partition_list.iter().map(|pid| *pid as u64).collect()))
Expand Down

0 comments on commit 36d7bc4

Please sign in to comment.