Skip to content

Commit

Permalink
Refine gen source (#427)
Browse files Browse the repository at this point in the history
* [gremlin_core] gen source with Partitioner

* [gremin_core] fix gen source with Partitioner in tests/benches
  • Loading branch information
BingqingLyu committed Jun 24, 2021
1 parent 675d6ad commit 0ad8b42
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 71 deletions.
17 changes: 7 additions & 10 deletions research/gaia/gremlin/gremlin_core/benches/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,18 @@ pub mod benchmark {
fn source(&self, src: &[u8]) -> CompileResult<Box<dyn Iterator<Item = Traverser> + Send>> {
let mut gremlin_step = GremlinStepPb::decode(&src[0..])
.map_err(|e| format!("protobuf decode failure: {}", e))?;
let num_servers = self.inner.get_num_servers();
if let Some(worker_id) = pegasus::get_current_worker() {
let num_workers = worker_id.peers as usize / num_servers;
let mut step = graph_step_from(&mut gremlin_step, num_servers)?;
let num_workers = worker_id.peers as usize / self.inner.get_num_servers();
let mut step = graph_step_from(&mut gremlin_step, self.inner.get_partitioner())?;
step.set_src(self.substitute_src_ids.clone(), self.inner.get_partitioner());
step.set_num_workers(num_workers);
step.set_server_index(0);
step.set_src(self.substitute_src_ids.clone(), num_servers);
step.set_requirement(self.requirement);
Ok(step.gen_source(Some(worker_id.index as usize)))
Ok(step.gen_source(worker_id.index as usize))
} else {
let mut step = graph_step_from(&mut gremlin_step, num_servers)?;
step.set_server_index(0);
step.set_src(self.substitute_src_ids.clone(), num_servers);
let mut step = graph_step_from(&mut gremlin_step, self.inner.get_partitioner())?;
step.set_src(self.substitute_src_ids.clone(), self.inner.get_partitioner());
step.set_requirement(self.requirement);
Ok(step.gen_source(None))
Ok(step.gen_source(self.inner.get_server_index() as usize))
}
}

Expand Down
14 changes: 8 additions & 6 deletions research/gaia/gremlin/gremlin_core/src/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ impl GremlinJobCompiler {
pub fn get_server_index(&self) -> u64 {
self.server_index
}

pub fn get_partitioner(&self) -> Arc<dyn Partitioner> {
self.partitioner.clone()
}
}

impl JobCompiler<Traverser> for GremlinJobCompiler {
Expand Down Expand Up @@ -71,14 +75,12 @@ impl JobCompiler<Traverser> for GremlinJobCompiler {
let mut step = decode::<pb::gremlin::GremlinStep>(src)?;
if let Some(worker_id) = pegasus::get_current_worker() {
let num_workers = worker_id.peers as usize / self.num_servers;
let mut step = graph_step_from(&mut step, self.num_servers)?;
let mut step = graph_step_from(&mut step, self.partitioner.clone())?;
step.set_num_workers(num_workers);
step.set_server_index(self.server_index);
Ok(step.gen_source(Some(worker_id.index as usize)))
Ok(step.gen_source(worker_id.index as usize))
} else {
let mut step = graph_step_from(&mut step, self.num_servers)?;
step.set_server_index(self.server_index);
Ok(step.gen_source(None))
let step = graph_step_from(&mut step, self.partitioner.clone())?;
Ok(step.gen_source(self.server_index as usize))
}
}

Expand Down
5 changes: 5 additions & 0 deletions research/gaia/gremlin/gremlin_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ pub trait FromPb<T> {

pub trait Partitioner: Send + Sync + 'static {
fn get_partition(&self, id: &ID, job_workers: usize) -> u64;
fn get_partition_num(&self) -> usize;
}

/// A simple partition utility
Expand All @@ -126,6 +127,10 @@ impl Partitioner for Partition {
// to do the computation.
((id_usize - magic_num * self.num_servers) * workers + magic_num % workers) as u64
}

fn get_partition_num(&self) -> usize {
self.num_servers
}
}

pub struct TraverserSinkEncoder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ use crate::generated::gremlin::EntityType;
use crate::process::traversal::step::util::StepSymbol;
use crate::process::traversal::step::Step;
use crate::process::traversal::traverser::{Requirement, Traverser};
use crate::storage::encode_store_e_id;
use crate::structure::codec::pb_chain_to_filter;
use crate::structure::{Edge, Label, QueryParams, Vertex, ID};
use crate::FromPb;
use crate::{FromPb, Partitioner};
use bit_set::BitSet;
use graph_store::common::LabelId;
use pegasus::BuildJobError;
use pegasus_common::downcast::*;
use prost::alloc::str::FromStr;
use std::sync::Arc;

/// V(), E()
pub struct GraphVertexStep {
Expand All @@ -39,7 +39,6 @@ pub struct GraphVertexStep {
return_type: EntityType,
// workers per server, for gen_source
workers: usize,
server_index: u64,
}

impl_as_any!(GraphVertexStep);
Expand All @@ -56,7 +55,6 @@ impl GraphVertexStep {
e_params: QueryParams::new(),
return_type,
workers: 1,
server_index: 0,
}
}

Expand All @@ -68,26 +66,18 @@ impl GraphVertexStep {
self.workers = workers;
}

pub fn set_server_index(&mut self, index: u64) {
self.server_index = index;
}

pub fn set_src(&mut self, ids: Vec<ID>, server_num: usize, entity_type: EntityType) {
let mut partition = Vec::with_capacity(server_num);
for _ in 0..server_num {
partition.push(vec![]);
pub fn set_src(&mut self, ids: Vec<ID>, partitioner: Arc<dyn Partitioner>) {
let partition_num = partitioner.get_partition_num();
let mut partitions = Vec::with_capacity(partition_num);
for _ in 0..partition_num {
partitions.push(vec![]);
}
for id in ids {
let src_id = if entity_type == EntityType::Vertex {
id
} else {
let eid = encode_store_e_id(&id);
eid.0 as u128
};
let idx = (src_id % server_num as ID) as usize;
partition[idx].push(id);
let pid = partitioner.get_partition(&id, self.workers) as usize;
partitions[pid].push(id);
}
self.src = Some(partition);

self.src = Some(partitions);
}

pub fn set_requirement(&mut self, requirement: Requirement) {
Expand All @@ -105,50 +95,39 @@ impl Step for GraphVertexStep {
}

impl GraphVertexStep {
pub fn gen_source(
self, worker_index: Option<usize>,
) -> Box<dyn Iterator<Item = Traverser> + Send> {
let gen_flag =
if let Some(w_index) = worker_index { w_index % self.workers == 0 } else { true };

pub fn gen_source(self, worker_index: usize) -> Box<dyn Iterator<Item = Traverser> + Send> {
let graph = crate::get_graph().unwrap();
let mut v_source = Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Vertex> + Send>;
let mut e_source = Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Edge> + Send>;

if self.return_type == EntityType::Vertex {
if let Some(ref seeds) = self.src {
// work 0 in current server are going to get_vertex
if gen_flag {
if let Some(src) = seeds.get(self.server_index as usize) {
if !src.is_empty() {
v_source = graph
.get_vertex(src, &self.v_params)
.unwrap_or(Box::new(std::iter::empty()));
}
if let Some(src) = seeds.get(worker_index) {
if !src.is_empty() {
v_source = graph
.get_vertex(src, &self.v_params)
.unwrap_or(Box::new(std::iter::empty()));
}
}
} else {
// work 0 in current server are going to scan_vertex
if gen_flag {
// worker 0 is going to scan
if worker_index % self.workers == 0 {
v_source =
graph.scan_vertex(&self.v_params).unwrap_or(Box::new(std::iter::empty()))
}
};
} else {
if let Some(ref seeds) = self.src {
// work 0 in current server are going to get_edge
if gen_flag {
if let Some(src) = seeds.get(self.server_index as usize) {
if !src.is_empty() {
e_source = graph
.get_edge(src, &self.e_params)
.unwrap_or(Box::new(std::iter::empty()));
}
if let Some(src) = seeds.get(worker_index) {
if !src.is_empty() {
e_source = graph
.get_edge(src, &self.e_params)
.unwrap_or(Box::new(std::iter::empty()));
}
}
} else {
// work 0 in current server are going to scan_edges
if gen_flag {
// worker 0 is going to scan
if worker_index % self.workers == 0 {
e_source =
graph.scan_edge(&self.e_params).unwrap_or(Box::new(std::iter::empty()));
}
Expand Down Expand Up @@ -176,7 +155,7 @@ impl GraphVertexStep {
}

pub fn graph_step_from(
gremlin_step: &mut pb::GremlinStep, num_servers: usize,
gremlin_step: &mut pb::GremlinStep, partitioner: Arc<dyn Partitioner>,
) -> Result<GraphVertexStep, BuildJobError> {
if let Some(option) = gremlin_step.step.take() {
match option {
Expand All @@ -192,7 +171,7 @@ pub fn graph_step_from(
ids.push(id);
}
if !ids.is_empty() {
step.set_src(ids, num_servers, return_type);
step.set_src(ids, partitioner);
}
let labels = std::mem::replace(&mut opt.labels, vec![]);
if let Some(ref test) = opt.predicates {
Expand Down
10 changes: 4 additions & 6 deletions research/gaia/gremlin/gremlin_core/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,16 +295,14 @@ pub mod test {
.map_err(|e| format!("protobuf decode failure: {}", e))?;
if let Some(worker_id) = pegasus::get_current_worker() {
let num_workers = worker_id.peers as usize / self.inner.get_num_servers();
let mut step = graph_step_from(&mut step, self.inner.get_num_servers())?;
let mut step = graph_step_from(&mut step, self.inner.get_partitioner())?;
step.set_num_workers(num_workers);
step.set_server_index(self.inner.get_server_index());
step.set_requirement(self.requirement);
Ok(step.gen_source(Some(worker_id.index as usize)))
Ok(step.gen_source(worker_id.index as usize))
} else {
let mut step = graph_step_from(&mut step, self.inner.get_num_servers())?;
step.set_server_index(self.inner.get_server_index());
let mut step = graph_step_from(&mut step, self.inner.get_partitioner())?;
step.set_requirement(self.requirement);
Ok(step.gen_source(None))
Ok(step.gen_source(self.inner.get_server_index() as usize))
}
}

Expand Down

0 comments on commit 0ad8b42

Please sign in to comment.