Skip to content

Commit

Permalink
[GIE-GAIA] Add Pegasus named apis, and add names for IR Operators (#1834
Browse files Browse the repository at this point in the history
)

* [Gaia/IR] Add `MapWithName` api.

* [IR Runtime] add name for map-related algebra op in runtime, and some code refine

* [Gaia/IR] Remove `MapWithName` trait and move all `xx_with_name()` functions into `Map` trait

Co-authored-by: longbin.lailb <longbin.lailb@alibaba-inc.com>
  • Loading branch information
BingqingLyu and longbinlai committed Jul 14, 2022
1 parent 5698d98 commit a756bc1
Show file tree
Hide file tree
Showing 21 changed files with 261 additions and 242 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};

use pegasus::api::{Iteration, Map, Reduce, Sink};
use pegasus::resource::{DefaultParResource, DistributedParResource};
use pegasus::resource::DistributedParResource;
use pegasus::{Configuration, JobConf, ServerConf};
use structopt::StructOpt;

Expand Down
28 changes: 28 additions & 0 deletions research/engine/pegasus/pegasus/src/api/concise/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ pub trait Map<I: Data> {
/// assert_eq!(expected, [2, 4, 6, 8, 10, 12, 14, 16, 18]);
/// ```
fn map<O, F>(self, func: F) -> Result<Stream<O>, BuildJobError>
where
O: Data,
F: Fn(I) -> FnResult<O> + Send + 'static,
Self: Sized,
{
self.map_with_name("", func)
}

fn map_with_name<O, F>(self, name: &str, func: F) -> Result<Stream<O>, BuildJobError>
where
O: Data,
F: Fn(I) -> FnResult<O> + Send + 'static;
Expand Down Expand Up @@ -92,6 +101,15 @@ pub trait Map<I: Data> {
/// assert_eq!(expected, [5, 9, 13, 17]);
/// ```
fn filter_map<O, F>(self, func: F) -> Result<Stream<O>, BuildJobError>
where
O: Data,
F: Fn(I) -> FnResult<Option<O>> + Send + 'static,
Self: Sized,
{
self.filter_map_with_name("", func)
}

fn filter_map_with_name<O, F>(self, name: &str, func: F) -> Result<Stream<O>, BuildJobError>
where
O: Data,
F: Fn(I) -> FnResult<Option<O>> + Send + 'static;
Expand Down Expand Up @@ -133,6 +151,16 @@ pub trait Map<I: Data> {
/// assert_eq!(expected, [1, 1, 2, 3, 3, 4, 5, 5, 7]);
/// ```
fn flat_map<O, R, F>(self, func: F) -> Result<Stream<O>, BuildJobError>
where
O: Data,
R: Iterator<Item = O> + Send + 'static,
F: Fn(I) -> FnResult<R> + Send + 'static,
Self: Sized,
{
self.flat_map_with_name("", func)
}

fn flat_map_with_name<O, R, F>(self, name: &str, func: F) -> Result<Stream<O>, BuildJobError>
where
O: Data,
R: Iterator<Item = O> + Send + 'static,
Expand Down
4 changes: 2 additions & 2 deletions research/engine/pegasus/pegasus/src/data_plane/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ mod test {
// build two channels for each worker
let mut threads = vec![];
let mut index = 0;
let mut start_index = 2;
let start_index = 2;
for server in server_list {
threads.push(std::thread::spawn(move || {
pegasus_network::start_up(
Expand Down Expand Up @@ -562,7 +562,7 @@ mod test {
// build two channels for each worker
let mut threads = vec![];
let mut index = 0;
let mut start_index = 4;
let start_index = 4;
for server in server_list {
threads.push(std::thread::spawn(move || {
pegasus_network::start_up(
Expand Down
22 changes: 16 additions & 6 deletions research/engine/pegasus/pegasus/src/operator/concise/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,23 @@ use crate::errors::BuildJobError;
use crate::stream::Stream;
use crate::Data;

/// A private function of getting a `Map` operator's name as a base name given by the system
/// plus an extra name given by the user
fn _get_name(base: &str, extra: &str) -> String {
if extra.is_empty() {
base.to_string()
} else {
format!("{:?} [{:?}]", base, extra)
}
}

impl<I: Data> Map<I> for Stream<I> {
fn map<O, F>(self, func: F) -> Result<Stream<O>, BuildJobError>
fn map_with_name<O, F>(self, name: &str, func: F) -> Result<Stream<O>, BuildJobError>
where
O: Data,
F: Fn(I) -> FnResult<O> + Send + 'static,
{
self.unary("map", |_info| {
self.unary(&_get_name("map", name), |_info| {
move |input, output| {
input.for_each_batch(|batch| {
if !batch.is_empty() {
Expand All @@ -41,12 +51,12 @@ impl<I: Data> Map<I> for Stream<I> {
})
}

fn filter_map<O, F>(self, func: F) -> Result<Stream<O>, BuildJobError>
fn filter_map_with_name<O, F>(self, name: &str, func: F) -> Result<Stream<O>, BuildJobError>
where
O: Data,
F: Fn(I) -> FnResult<Option<O>> + Send + 'static,
{
self.unary("filter_map", |_info| {
self.unary(&_get_name("filter_map", name), |_info| {
move |input, output| {
input.for_each_batch(|batch| {
if !batch.is_empty() {
Expand All @@ -63,13 +73,13 @@ impl<I: Data> Map<I> for Stream<I> {
})
}

fn flat_map<O, R, F>(self, func: F) -> Result<Stream<O>, BuildJobError>
fn flat_map_with_name<O, R, F>(self, name: &str, func: F) -> Result<Stream<O>, BuildJobError>
where
O: Data,
R: Iterator<Item = O> + Send + 'static,
F: Fn(I) -> FnResult<R> + Send + 'static,
{
self.unary("flat_map", |info| {
self.unary(&_get_name("flat_map", name), |info| {
let index = info.index;
move |input, output| {
input.for_each_batch(|dataset| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use crate::{BuildJobError, Data};

mod feedback;
mod switch;
use crate::api::iteration::EmitKind;
use feedback::FeedbackOperator;
use switch::SwitchOperator;

use crate::api::iteration::EmitKind;
use crate::macros::map::FnResult;

impl<D: Data> Iteration<D> for Stream<D> {
Expand Down
3 changes: 1 addition & 2 deletions research/engine/pegasus/pegasus/tests/join_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ fn join_test_with_same_key() {
let mut conf = JobConf::new("inner_join");
conf.set_workers(1);
let mut result = pegasus::run(conf, || {
let id = pegasus::get_current_worker().index;
move |input, output| {
let (src1, src2) = input.input_from(0..1000 as i32)?.copied()?;
src1.key_by(|x| Ok((1 as i32, x)))?
Expand All @@ -126,7 +125,7 @@ fn join_test_with_same_key() {
})
.expect("run job failure;");

let mut result = result.next().unwrap().unwrap();
let result = result.next().unwrap().unwrap();
assert_eq!(result.len(), 1000 * 1000);
}
/*
Expand Down
30 changes: 30 additions & 0 deletions research/query_service/ir/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use dyn_type::{Object, Primitives};

use crate::error::ParsePbError;
use crate::generated::algebra as pb;
use crate::generated::algebra::logical_plan::operator::Opr;
use crate::generated::common as common_pb;
use crate::NameOrId;

Expand Down Expand Up @@ -570,6 +571,35 @@ impl From<Object> for common_pb::Value {
}
}

impl pb::logical_plan::operator::Opr {
pub fn get_name(&self) -> String {
let name = match self {
Opr::Project(_) => "Project",
Opr::Select(_) => "Select",
Opr::Join(_) => "Join",
Opr::Union(_) => "Union",
Opr::GroupBy(_) => "GroupBy",
Opr::OrderBy(_) => "OrderBy",
Opr::Dedup(_) => "Dedup",
Opr::Unfold(_) => "Unfold",
Opr::Apply(_) => "Apply",
Opr::SegApply(_) => "SegApply",
Opr::Scan(_) => "Scan",
Opr::Limit(_) => "Limit",
Opr::Auxilia(_) => "Auxilia",
Opr::As(_) => "As",
Opr::Sink(_) => "Sink",
Opr::Vertex(_) => "GetV",
Opr::Edge(_) => "EdgeExpand",
Opr::Path(_) => "PathExpand",
Opr::PathStart(_) => "PathStart",
Opr::PathEnd(_) => "PathEnd",
Opr::Pattern(_) => "Pattern",
};
name.to_string()
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
6 changes: 2 additions & 4 deletions research/query_service/ir/integrated/tests/auxilia_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@ mod test {
fn source_gen(alias: Option<common_pb::NameOrId>) -> Box<dyn Iterator<Item = Record> + Send> {
create_exp_store();
let scan_opr_pb = pb::Scan { scan_opt: 0, alias, params: None, idx_predicate: None };
let mut source_opr_pb =
pb::logical_plan::Operator { opr: Some(pb::logical_plan::operator::Opr::Scan(scan_opr_pb)) };
let source_opr_pb = pb::logical_plan::operator::Opr::Scan(scan_opr_pb);
let source =
SourceOperator::new(&mut source_opr_pb, 1, 1, Arc::new(SimplePartition { num_servers: 1 }))
.unwrap();
SourceOperator::new(source_opr_pb, 1, 1, Arc::new(SimplePartition { num_servers: 1 })).unwrap();
source.gen_source(0).unwrap()
}

Expand Down
6 changes: 2 additions & 4 deletions research/query_service/ir/integrated/tests/expand_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@ mod test {
fn source_gen(alias: Option<common_pb::NameOrId>) -> Box<dyn Iterator<Item = Record> + Send> {
create_exp_store();
let scan_opr_pb = pb::Scan { scan_opt: 0, alias, params: None, idx_predicate: None };
let mut source_opr_pb =
pb::logical_plan::Operator { opr: Some(pb::logical_plan::operator::Opr::Scan(scan_opr_pb)) };
let source_opr_pb = pb::logical_plan::operator::Opr::Scan(scan_opr_pb);
let source =
SourceOperator::new(&mut source_opr_pb, 1, 1, Arc::new(SimplePartition { num_servers: 1 }))
.unwrap();
SourceOperator::new(source_opr_pb, 1, 1, Arc::new(SimplePartition { num_servers: 1 })).unwrap();
source.gen_source(0).unwrap()
}

Expand Down
6 changes: 2 additions & 4 deletions research/query_service/ir/integrated/tests/scan_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ mod test {
// g.V()
fn scan_gen(scan_opr_pb: pb::Scan) -> Box<dyn Iterator<Item = Record> + Send> {
create_exp_store();
let mut source_opr_pb =
pb::logical_plan::Operator { opr: Some(pb::logical_plan::operator::Opr::Scan(scan_opr_pb)) };
let source_opr_pb = pb::logical_plan::operator::Opr::Scan(scan_opr_pb);
let source =
SourceOperator::new(&mut source_opr_pb, 1, 1, Arc::new(SimplePartition { num_servers: 1 }))
.unwrap();
SourceOperator::new(source_opr_pb, 1, 1, Arc::new(SimplePartition { num_servers: 1 })).unwrap();
source.gen_source(0).unwrap()
}

Expand Down
Loading

0 comments on commit a756bc1

Please sign in to comment.