Skip to content

Commit

Permalink
[GIE/Runtime] Refine some error infos in Runtime (#2113)
Browse files Browse the repository at this point in the history
* [GIE/Runtime] Refine some error infos in Runtime

* [GIE/Runtime] Refine some error infos in Runtime

* [GIE/Runtime] refine runtime error info

* [GIE/Runtime] refine runtime error info in vertex shuffle
  • Loading branch information
BingqingLyu committed Oct 12, 2022
1 parent d16b1b4 commit 31829fd
Show file tree
Hide file tree
Showing 27 changed files with 97 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl ReadGraph for ExpStore {
fn index_scan_vertex(
&self, _label: LabelId, _primary_key: &PKV, _params: &QueryParams,
) -> GraphProxyResult<Option<Vertex>> {
Err(GraphProxyError::query_store_error(
Err(GraphProxyError::unsupported_error(
"Experiment storage does not support index_scan_vertex for now",
))?
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ impl Partitioner for GrootMultiPartition {
let server_index = self
.graph_partition_manager
.get_server_id(partition_id as PartitionId)
.ok_or(GraphProxyError::query_store_error("get server id failed in graph_partition_manager"))?
as u64;
.ok_or(GraphProxyError::query_store_error(&format!(
"get server id failed on Groot with vid of {:?}, partition_id of {:?}",
vid, partition_id
)))? as u64;
let worker_index = partition_id % worker_num_per_server;
Ok(server_index * worker_num_per_server + worker_index as u64)
}
Expand Down Expand Up @@ -119,9 +121,10 @@ impl Partitioner for VineyardMultiPartition {
let server_index = *self
.partition_server_index_mapping
.get(&partition_id)
.ok_or(GraphProxyError::query_store_error(
"get server index by given partition id failed in Vineyard",
))? as u64;
.ok_or(GraphProxyError::query_store_error(&format!(
"get server id failed on Vineyard with vid of {:?}, partition_id of {:?}",
vid, partition_id
)))? as u64;
let worker_index = partition_id as u64 % worker_num_per_server;
Ok(server_index * worker_num_per_server + worker_index)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ fn extract_needed_columns(
filter: Option<&Arc<PEvaluator>>, out_columns: Option<&Vec<PropId>>,
) -> GraphProxyResult<Option<Vec<PropId>>> {
use ahash::HashSet;

use super::translation::zip_option_vecs;

// Some(vec[]) means need all props, so can't merge it with props needed in filter
Expand Down
2 changes: 1 addition & 1 deletion interactive_engine/executor/ir/runtime/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl std::fmt::Display for FnGenError {
FnGenError::ParseError(e) => write!(f, "Parse pb error in fn gen {}", e),
FnGenError::NullGraphError => write!(f, "Null graph store error in fn gen",),
FnGenError::StoreError(e) => write!(f, "Query store error in fn gen {}", e),
FnGenError::UnSupported(e) => write!(f, "Op not supported error in fn gen {}", e),
FnGenError::UnSupported(e) => write!(f, "Unsupported error in fn gen {}", e),
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ impl AccumFactoryGen for algebra_pb::GroupBy {
if agg_func.vars.len() > 1 {
// e.g., count_distinct((a,b));
// TODO: to support this, we may need to define MultiTagKey (could define TagKey Trait, and impl for SingleTagKey and MultiTagKey)
Err(FnGenError::unsupported_error("Do not support to aggregate multiple fields yet"))?
Err(FnGenError::unsupported_error(&format!(
"aggregate multiple fields in `Accum`, fields are {:?}",
agg_func.vars
)))?
}
let tag_key = agg_func
.vars
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,10 @@

mod accum;
pub mod accumulator;

pub use accum::RecordAccumulator;
use ir_common::error::ParsePbError;
use ir_common::generated::algebra as algebra_pb;

use crate::error::FnGenResult;

pub trait AccumFactoryGen {
fn gen_accum(self) -> FnGenResult<RecordAccumulator>;
}

impl AccumFactoryGen for algebra_pb::logical_plan::Operator {
fn gen_accum(self) -> FnGenResult<RecordAccumulator> {
if let Some(opr) = self.opr {
match opr {
// TODO: it should be different for group and fold; For fold, we should further consider fold_partition and fold;
algebra_pb::logical_plan::operator::Opr::GroupBy(group) => group.gen_accum(),
_ => Err(ParsePbError::from("algebra_pb op is not a accum op").into()),
}
} else {
Err(ParsePbError::from("algebra op is empty").into())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl FilterFuncGen for algebra_pb::logical_plan::operator::Opr {
fn gen_filter(self) -> FnGenResult<Box<dyn FilterFunction<Record>>> {
match self {
algebra_pb::logical_plan::operator::Opr::Select(select) => select.gen_filter(),
_ => Err(ParsePbError::from("algebra_pb op is not a filter"))?,
_ => Err(ParsePbError::from(format!("the operator is not a `Filter`, it is {:?}", self)))?,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ impl FlatMapFuncGen for algebra_pb::FusedOperator {
} else if let Ok(flat_map) = inner_op.gen_flat_map() {
funcs.push(FusedFunc::FlatMap(flat_map));
} else {
return Err(FnGenError::UnSupported(format!(
"the operator: {:?} cannot be fused as it is neither `FilterMap` or `FlatMap`",
return Err(FnGenError::unsupported_error(&format!(
"neither `FilterMap` or `FlatMap` operator to fuse, the operator is {:?}",
op
)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ impl FlatMapFunction<Record, Record> for GetBothVOperator {
Box::new(vec![src_vertex, dst_vertex].into_iter()),
)))
} else {
Err(FnExecError::unexpected_data_error(
"Can only apply `GetV` with BothV opt (`Auxilia` instead) on an edge entry",
))?
Err(FnExecError::unexpected_data_error(&format!(
"Can't apply `GetV` with BothV opt (`Auxilia` instead) on an non-edge entry {:?}",
entry
)))?
}
} else {
Ok(Box::new(vec![].into_iter()))
Expand All @@ -68,9 +69,10 @@ impl FlatMapFuncGen for algebra_pb::GetV {
.transpose()?;
let opt: VOpt = unsafe { ::std::mem::transmute(self.opt) };
match opt {
VOpt::Start | VOpt::End | VOpt::Other => {
Err(ParsePbError::ParseError(format!("GetV with VOpt {:?} is not a flatmap op", opt)))?
}
VOpt::Start | VOpt::End | VOpt::Other => Err(ParsePbError::from(format!(
"the `GetV` operator is not a `FlatMap`, which has GetV::VOpt: {:?}",
opt
)))?,
VOpt::Both => {}
}
let alias = self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ impl FlatMapFuncGen for algebra_pb::logical_plan::operator::Opr {
algebra_pb::logical_plan::operator::Opr::Edge(edge_expand) => edge_expand.gen_flat_map(),
algebra_pb::logical_plan::operator::Opr::Vertex(get_vertex) => get_vertex.gen_flat_map(),
algebra_pb::logical_plan::operator::Opr::Unfold(_unfold) => {
Err(FnGenError::unsupported_error("unfold is not supported yet"))
Err(FnGenError::unsupported_error("`Unfold` opr"))
}
algebra_pb::logical_plan::operator::Opr::Fused(fused) => fused.gen_flat_map(),
_ => Err(ParsePbError::ParseError(format!("the operator: {:?} is not a `FlatMap`", self)))?,
_ => Err(ParsePbError::from(format!("the operator is not a `FlatMap`, it is {:?}", self)))?,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl FoldGen<u64, Record> for algebra_pb::GroupBy {
.transpose()?;
Ok(Box::new(CountAlias { alias: count_alias }))
} else {
Err(FnGenError::unsupported_error("Do not support fold_map except simple count"))
Err(FnGenError::unsupported_error(&format!("fold_map in `Accum` {:?}", self)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ impl GroupGen<Record, RecordKey, Record> for algebra_pb::GroupBy {
.ok_or(ParsePbError::from("key alias is missing in group"))?
.try_into()?,
);
let alias = alias.ok_or(ParsePbError::from("key alias cannot be None in group"))?;
let alias = alias
.ok_or(ParsePbError::from(format!("key alias cannot be None in group opr {:?}", self)))?;
key_aliases.push(alias);
}
let group_map = GroupMap { key_aliases };
Expand All @@ -64,9 +65,10 @@ impl MapFunction<(RecordKey, Record), Record> for GroupMap {
fn exec(&self, (group_key, mut group_value): (RecordKey, Record)) -> FnResult<Record> {
let group_key_entries = group_key.take();
if group_key_entries.len() != self.key_aliases.len() {
Err(FnExecError::unexpected_data_error(
"the number of group_keys and group_key_aliases should be equal",
))?
Err(FnExecError::unexpected_data_error(&format!(
"the number of group_keys and group_key_aliases should be equal: {:?}, {:?}",
group_key_entries, self.key_aliases
)))?
}
for (entry, alias) in group_key_entries
.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl GroupFunctionGen for algebra_pb::logical_plan::operator::Opr {
fn gen_group(self) -> FnGenResult<Box<dyn GroupGen<Record, RecordKey, Record>>> {
match self {
algebra_pb::logical_plan::operator::Opr::GroupBy(group) => Ok(Box::new(group)),
_ => Err(ParsePbError::from("algebra_pb op is not a group op").into()),
_ => Err(ParsePbError::from(format!("the operator is not a `Group`, it is {:?}", self)))?,
}
}
}
Expand All @@ -44,7 +44,7 @@ impl FoldFactoryGen for algebra_pb::logical_plan::operator::Opr {
fn gen_fold(self) -> FnGenResult<Box<dyn FoldGen<u64, Record>>> {
match self {
algebra_pb::logical_plan::operator::Opr::GroupBy(non_key_group) => Ok(Box::new(non_key_group)),
_ => Err(ParsePbError::from("algebra_pb op is not a fold op").into()),
_ => Err(ParsePbError::from(format!("the operator is not a `Fold`, it is {:?}", self)))?,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl JoinFunctionGen for algebra_pb::logical_plan::operator::Opr {
fn gen_join(self) -> FnGenResult<Box<dyn JoinKeyGen<Record, RecordKey, Record>>> {
match self {
algebra_pb::logical_plan::operator::Opr::Join(join) => Ok(Box::new(join)),
_ => Err(ParsePbError::from("algebra_pb op is not a keyed op"))?,
_ => Err(ParsePbError::from(format!("the operatoris not a `Join`, it is {:?} ", self)))?,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ impl KeyFunctionGen for algebra_pb::logical_plan::operator::Opr {
algebra_pb::logical_plan::operator::Opr::GroupBy(group) => group.gen_key(),
algebra_pb::logical_plan::operator::Opr::Dedup(dedup) => dedup.gen_key(),
algebra_pb::logical_plan::operator::Opr::SegApply(_seg_apply) => {
Err(FnGenError::unsupported_error("SegApply is not supported yet"))?
Err(FnGenError::unsupported_error("`SegApply` opr"))?
}
_ => {
Err(ParsePbError::from(format!("the operator is not a keyed operator, it is {:?}", self)))?
}
_ => Err(ParsePbError::from("algebra_pb op is not a keyed op"))?,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ impl FilterMapFunction<Record, Record> for AuxiliaOperator {
edge.into()
})
} else {
Err(FnExecError::unexpected_data_error("should be vertex or edge in AuxiliaOperator"))?
Err(FnExecError::unexpected_data_error(&format!(
"neither Vertex nor Edge entry is accessed in `Auxilia` operator, the entry is {:?}",
entry
)))?
};
if new_entry.is_some() {
input.append(new_entry.unwrap(), self.alias.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl FilterMapFuncGen for algebra_pb::GetV {
.transpose()?;
let opt: VOpt = unsafe { ::std::mem::transmute(self.opt) };
if let VOpt::Both = opt {
Err(ParsePbError::ParseError("GetV with VOpt::Both is not a map op".to_string()))?
Err(ParsePbError::from("the `GetV` operator is not a `FilterMap`, which has GetV::VOpt::Both"))?
}
let alias = self
.alias
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl FilterMapFuncGen for algebra_pb::logical_plan::operator::Opr {
algebra_pb::logical_plan::operator::Opr::PathStart(path_start) => path_start.gen_filter_map(),
algebra_pb::logical_plan::operator::Opr::Project(project) => project.gen_filter_map(),
algebra_pb::logical_plan::operator::Opr::Auxilia(auxilia) => auxilia.gen_filter_map(),
_ => Err(ParsePbError::ParseError(format!("the operator: {:?} is not a `Map`", self)))?,
_ => Err(ParsePbError::from(format!("the operator is not a `FilterMap`, it is {:?}", self)))?,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ impl MapFunction<Record, Record> for PathEndOperator {
fn exec(&self, mut input: Record) -> FnResult<Record> {
let entry = input
.get(None)
.ok_or(FnExecError::get_tag_error("get tag failed in GetVertexOperator"))?
.ok_or(FnExecError::get_tag_error(&format!(
"get None tag from the current record in `PathEnd` operator, the record is {:?}",
input
)))?
.clone();
if self.alias.is_some() {
input.append_arc_entry(entry.clone(), self.alias.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ impl FilterMapFunction<Record, Record> for PathStartOperator {
if let Some(entry) = input.get(self.start_tag) {
let v = entry
.as_graph_vertex()
.ok_or(FnExecError::unexpected_data_error(
"tag does not refer to a graph vertex element",
))?;
.ok_or(FnExecError::unexpected_data_error(&format!(
"tag {:?} does not refer to a graph vertex element in record {:?}",
self.start_tag, input
)))?;
let graph_path = GraphPath::new(v.clone(), self.path_opt, self.result_opt);
input.append(graph_path, None);
Ok(Some(input))
Expand Down
20 changes: 11 additions & 9 deletions interactive_engine/executor/ir/runtime/src/process/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ impl TagKey {
}
}

fn get_key(&self, element: &Arc<Entry>, prop_key: &PropKey) -> FnExecResult<Entry> {
if let Some(element) = element.as_graph_element() {
fn get_key(&self, entry: &Arc<Entry>, prop_key: &PropKey) -> FnExecResult<Entry> {
if let Some(element) = entry.as_graph_element() {
let prop_obj = match prop_key {
PropKey::Id => element.id().into(),
PropKey::Label => element
Expand All @@ -85,9 +85,10 @@ impl TagKey {
PropKey::All => {
let details = element
.details()
.ok_or(FnExecError::unexpected_data_error(
"Get key failed since get details from a graph element failed",
))?;
.ok_or(FnExecError::unexpected_data_error(&format!(
"Get `PropKey::All` on {:?}",
entry,
)))?;

if let Some(properties) = details.get_all_properties() {
properties
Expand All @@ -108,9 +109,10 @@ impl TagKey {
PropKey::Key(key) => {
let details = element
.details()
.ok_or(FnExecError::unexpected_data_error(
"Get key failed since get details from a graph element failed",
))?;
.ok_or(FnExecError::unexpected_data_error(&format!(
"Get `PropKey::Key` of {:?} on {:?}",
key, entry,
)))?;
if let Some(properties) = details.get_property(key) {
properties
.try_to_owned()
Expand All @@ -126,7 +128,7 @@ impl TagKey {
Err(FnExecError::unexpected_data_error(&format!(
"
Get key failed since get details from a none-graph element {:?} ",
element
entry
)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl SinkGen for algebra_pb::logical_plan::operator::Opr {
Err(ParsePbError::EmptyFieldError("sink_target is missing".to_string()))?
}
}
_ => Err(ParsePbError::from("algebra_pb op is not a sink op"))?,
_ => Err(ParsePbError::from(format!("the operator is not a `Sink`, it is {:?}", self)))?,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ impl Accumulator<Record, Record> for GraphSinkEncoder {
for sink_key in &self.sink_keys {
let entry = next
.take(sink_key.as_ref())
.ok_or(FnExecError::get_tag_error(&format!("tag {:?} in GraphWriter", sink_key)))?;
.ok_or(FnExecError::get_tag_error(&format!(
"tag {:?} in GraphWriter on {:?}",
sink_key, next
)))?;
if let Some(v) = entry.as_graph_vertex() {
let vertex_pk = graph
.get_primary_key(&v.id())?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl CompareFunctionGen for algebra_pb::logical_plan::operator::Opr {
fn gen_cmp(self) -> FnGenResult<Box<dyn CompareFunction<Record>>> {
match self {
algebra_pb::logical_plan::operator::Opr::OrderBy(order) => order.gen_cmp(),
_ => Err(ParsePbError::from("algebra_pb op is not a order").into()),
_ => Err(ParsePbError::from(format!("the operator is not an `Order`, it is {:?}", self)))?,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ impl SourceOperator {
}
Err(err) => {
debug!("get partition list failed in graph_partition_manager in source op {:?}", err);
Err(ParsePbError::Unsupported(
format!("get partition list failed in graph_partition_manager in source op {:?}", err)
.to_string(),
))?
Err(ParsePbError::Unsupported(format!(
"get partition list failed in graph_partition_manager in source op {:?}",
err
)))?
}
}
Ok(())
Expand All @@ -140,7 +140,10 @@ impl SourceOperator {
}
} else if let Some(ref indexed_values) = self.primary_key_values {
if self.query_params.labels.len() != 1 {
Err(FnGenError::unsupported_error("indexed_scan with empty/multiple labels"))?
Err(FnGenError::unsupported_error(&format!(
"Empty/Multiple labels in `IndexScan`, labels are {:?}",
self.query_params.labels
)))?
}
if let Some(v) = graph.index_scan_vertex(
self.query_params.labels[0],
Expand Down Expand Up @@ -169,7 +172,9 @@ impl SourceOperator {
}
Ok(Box::new(e_source.map(move |e| Record::new(e, self.alias.clone()))))
}
SourceType::Table => Err(FnGenError::unsupported_error("data source of `Table` type"))?,
SourceType::Table => Err(FnGenError::unsupported_error(
"neither `Edge` nor `Vertex` but `Table` type `Source` opr",
))?,
}
}
}
Expand Down
Loading

0 comments on commit 31829fd

Please sign in to comment.