Skip to content

Commit

Permalink
[Bug Fix] fix a bug in duplicated results by index scan (#1865)
Browse files Browse the repository at this point in the history
  • Loading branch information
BingqingLyu committed Jul 21, 2022
1 parent fd9b55f commit d024cda
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,27 +124,34 @@ where
fn index_scan_vertex(
&self, label_id: &NameOrId, primary_key: &PKV, _params: &QueryParams,
) -> GraphProxyResult<Option<Vertex>> {
let store_label_id = encode_storage_label(label_id)?;
let store_indexed_values = match primary_key {
OneOrMany::One(pkv) => {
vec![encode_store_prop_val(pkv[0].1.clone())]
// get_vertex_id_by_primary_keys() is a global query function, that is,
// you can query vertices (with only vertex id) by pks on any graph partitions (not matter locally or remotely).
// To guarantee the correctness (i.e., avoid duplication results), only worker 0 is assigned for query.
if pegasus::get_current_worker().index == 0 {
let store_label_id = encode_storage_label(label_id)?;
let store_indexed_values = match primary_key {
OneOrMany::One(pkv) => {
vec![encode_store_prop_val(pkv[0].1.clone())]
}
OneOrMany::Many(pkvs) => pkvs
.iter()
.map(|(_pk, value)| encode_store_prop_val(value.clone()))
.collect(),
};

if let Some(vid) = self
.partition_manager
.get_vertex_id_by_primary_keys(store_label_id, store_indexed_values.as_ref())
{
// TODO: confirm if this should be lazy details?
Ok(Some(Vertex::new(
vid as ID,
Some(label_id.clone()),
DynDetails::new(DefaultDetails::default()),
)))
} else {
Ok(None)
}
OneOrMany::Many(pkvs) => pkvs
.iter()
.map(|(_pk, value)| encode_store_prop_val(value.clone()))
.collect(),
};

if let Some(vid) = self
.partition_manager
.get_vertex_id_by_primary_keys(store_label_id, store_indexed_values.as_ref())
{
// TODO: confirm if this should be lazy details?
Ok(Some(Vertex::new(
vid as ID,
Some(label_id.clone()),
DynDetails::new(DefaultDetails::default()),
)))
} else {
Ok(None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ impl SourceOperator {
}
}
} else if let Some(ref indexed_values) = self.primary_key_values {
// parallel indexed scan
if self.query_params.labels.len() != 1 {
Err(FnGenError::unsupported_error("indexed_scan with empty/multiple labels"))?
}
Expand Down

0 comments on commit d024cda

Please sign in to comment.