diff --git a/interactive_engine/executor/ir/runtime/src/assembly.rs b/interactive_engine/executor/ir/runtime/src/assembly.rs index f70a44696be4..d4028630b289 100644 --- a/interactive_engine/executor/ir/runtime/src/assembly.rs +++ b/interactive_engine/executor/ir/runtime/src/assembly.rs @@ -13,6 +13,7 @@ //! See the License for the specific language governing permissions and //! limitations under the License. +use std::convert::TryInto; use std::sync::Arc; use graph_proxy::apis::Partitioner; @@ -31,7 +32,6 @@ use pegasus::{BuildJobError, Worker}; use pegasus_server::job::{JobAssembly, JobDesc}; use pegasus_server::job_pb as server_pb; use prost::Message; -use std::convert::TryInto; use crate::error::{FnExecError, FnGenError, FnGenResult}; use crate::process::functions::{ApplyGen, CompareFunction, FoldGen, GroupGen, JoinKeyGen, KeyFunction}; diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/sink/mod.rs b/interactive_engine/executor/ir/runtime/src/process/operator/sink/mod.rs index 5967896981dc..605b51834cc3 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/sink/mod.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/sink/mod.rs @@ -16,6 +16,8 @@ mod sink; #[cfg(feature = "with_v6d")] mod sink_vineyard; +use std::convert::TryInto; + use ir_common::error::ParsePbError; use ir_common::generated::algebra as algebra_pb; use ir_common::generated::physical as pb; @@ -24,7 +26,6 @@ use crate::error::FnGenResult; use crate::process::operator::sink::sink::{DefaultSinkOp, RecordSinkEncoder}; #[cfg(feature = "with_v6d")] use crate::process::operator::sink::sink_vineyard::{GraphSinkEncoder, SinkVineyardOp}; -use std::convert::TryInto; pub enum Sinker { DefaultSinker(RecordSinkEncoder), diff --git a/interactive_engine/executor/store/mcsr/src/bin/single_laser.rs b/interactive_engine/executor/store/mcsr/src/bin/single_laser.rs index ad6f7a5b81d9..b87be675ba9b 100644 --- a/interactive_engine/executor/store/mcsr/src/bin/single_laser.rs +++ b/interactive_engine/executor/store/mcsr/src/bin/single_laser.rs @@ -97,7 +97,7 @@ fn main() { std::fs::create_dir_all(&out_dir).expect("Create graph schema directory error"); } let graph_schema = - CsrGraphSchema::from_json_file(&graph_schema_file).expect("Read trimed schema error!"); + CsrGraphSchema::from_json_file(&graph_schema_file).expect("Read graph schema error!"); graph_schema .to_json_file(&out_dir.join(FILE_SCHEMA)) .expect("Write graph schema error!"); diff --git a/interactive_engine/executor/store/mcsr/src/graph.rs b/interactive_engine/executor/store/mcsr/src/graph.rs index 84da18aa4899..44845eec09af 100644 --- a/interactive_engine/executor/store/mcsr/src/graph.rs +++ b/interactive_engine/executor/store/mcsr/src/graph.rs @@ -1,19 +1,19 @@ -use pegasus_common::codec::ReadExt; -use pegasus_common::io::WriteExt; use std::fmt; use std::hash::Hash; use std::ops::AddAssign; +use pegasus_common::codec::ReadExt; +use pegasus_common::io::WriteExt; + /// Trait for the unsigned integer type used for node and edge indices. /// /// Marked `unsafe` because: the trait must faithfully preserve /// and convert index values. -pub unsafe trait IndexType: - Copy + Default + Hash + Ord + fmt::Debug + 'static + AddAssign -{ +pub unsafe trait IndexType: Copy + Default + Hash + Ord + fmt::Debug + 'static + AddAssign { fn new(x: usize) -> Self; fn index(&self) -> usize; fn max() -> Self; + fn add_assign(&mut self, other: Self); fn write_to(&self, writer: &mut W) -> std::io::Result<()>; diff --git a/interactive_engine/executor/store/mcsr/src/ldbc_parser.rs b/interactive_engine/executor/store/mcsr/src/ldbc_parser.rs index 03ddf93fc35f..78bb723fac85 100644 --- a/interactive_engine/executor/store/mcsr/src/ldbc_parser.rs +++ b/interactive_engine/executor/store/mcsr/src/ldbc_parser.rs @@ -1,7 +1,8 @@ -use csv::StringRecord; use std::marker::PhantomData; use std::str::FromStr; +use csv::StringRecord; + use crate::graph::IndexType; use crate::types::*; @@ -30,8 +31,7 @@ pub struct LDBCVertexParser { ph: PhantomData, } -pub const LABEL_SHIFT_BITS: usize = - 8 * (std::mem::size_of::() - std::mem::size_of::()); +pub const LABEL_SHIFT_BITS: usize = 8 * (std::mem::size_of::() - std::mem::size_of::()); impl LDBCVertexParser { pub fn to_global_id(ldbc_id: usize, label_id: LabelId) -> G { @@ -51,22 +51,19 @@ impl LDBCVertexParser { impl LDBCVertexParser { pub fn new(vertex_type: LabelId, id_index: usize) -> Self { - Self { - vertex_type, - id_index, - ph: PhantomData, - } + Self { vertex_type, id_index, ph: PhantomData } } pub fn parse_vertex_meta(&self, record: &StringRecord) -> VertexMeta { let global_id = Self::to_global_id( - record.get(self.id_index).unwrap().parse::().unwrap(), + record + .get(self.id_index) + .unwrap() + .parse::() + .unwrap(), self.vertex_type, ); - VertexMeta { - global_id, - label: self.vertex_type, - } + VertexMeta { global_id, label: self.vertex_type } } } @@ -83,14 +80,7 @@ pub struct LDBCEdgeParser { impl LDBCEdgeParser { pub fn new(src_vertex_type: LabelId, dst_vertex_type: LabelId, edge_type: LabelId) -> Self { - Self { - src_vertex_type, - dst_vertex_type, - edge_type, - src_col_id: 0, - dst_col_id: 1, - ph: PhantomData, - } + Self { src_vertex_type, dst_vertex_type, edge_type, src_col_id: 0, dst_col_id: 1, ph: PhantomData } } pub fn with_endpoint_col_id(&mut self, src_col_id: usize, dst_col_id: usize) {