Skip to content

Commit

Permalink
fix the join bug (#1608)
Browse files Browse the repository at this point in the history
  • Loading branch information
wzbxpy committed May 24, 2022
1 parent 4b33307 commit 0d68477
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 43 deletions.
48 changes: 17 additions & 31 deletions research/engine/pegasus/pegasus/src/operator/concise/keyed/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ impl<L: Data + HasKey, R: Data + HasKey> Helper<L, R> {
fn insert_and_query<'a, L: Data + HasKey, R: Data + HasKey<Target = L::Target>>(
map1: &mut JoinMap<L>, map2: &'a mut JoinMap<R>, data: &L, need_insert: bool,
) -> Option<&'a Vec<R>>
where
L::Target: Clone + Send,
where
L::Target: Clone + Send,
{
let k = data.get_key();
let entry1 = map1
Expand Down Expand Up @@ -125,9 +125,7 @@ fn try_outer_join_output<L: Data + HasKey, R: Data + HasKey>(
// The case of left/full outer join, which must output a <L, None> for any left item
// that has not been matched (`entry.indicator = false`)
for entry in map.values().filter(|entry| !entry.indicator) {
for item in &entry.data {
session.give((Some(item.clone()), None))?;
}
session.give_iterator(entry.data.clone().into_iter().map(|l|(Some(l),None)))?;
}
}
}
Expand All @@ -136,9 +134,7 @@ fn try_outer_join_output<L: Data + HasKey, R: Data + HasKey>(
// The case of right/full outer join, which must output a <None, R> for any right item
// that has not been matched (`entry.indicator = false`)
for entry in map.values().filter(|entry| !entry.indicator) {
for item in &entry.data {
session.give((None, Some(item.clone())))?;
}
session.give_iterator(entry.data.clone().into_iter().map(|r|(None,Some(r))))?;
}
}
}
Expand All @@ -160,9 +156,7 @@ fn try_semi_join_output<L: Data + HasKey, R: Data + HasKey>(
// Here, `entry.indicator ^ is_anti` does the above assertion.
.filter(|entry| entry.indicator ^ is_anti)
{
for item in &entry.data {
session.give(item.clone())?;
}
session.give_iterator(entry.data.clone().into_iter())?;
}
}
helper.right_map.remove(tag);
Expand All @@ -172,8 +166,8 @@ fn try_semi_join_output<L: Data + HasKey, R: Data + HasKey>(
fn internal_inner_join<L: Data + HasKey, R: Data + HasKey<Target = L::Target>>(
this: Stream<L>, other: Stream<R>,
) -> Result<Stream<(L, R)>, BuildJobError>
where
L::Target: Clone + Send,
where
L::Target: Clone + Send,
{
this.partition_by_key()
.binary("inner_join", other, |info| {
Expand All @@ -184,9 +178,7 @@ where
let (mut l_map, mut r_map, _, need_insert) = helper.get_maps_mut(&dataset.tag);
for l in dataset.drain() {
if let Some(arr) = insert_and_query(&mut l_map, &mut r_map, &l, need_insert) {
for r in arr {
session.give((l.clone(), r.clone()))?;
}
session.give_iterator(arr.clone().into_iter().map(move|r|(l.clone(),r)))?;
}
}
if dataset.is_last() {
Expand All @@ -199,9 +191,7 @@ where
let (mut l_map, mut r_map, need_insert, _) = helper.get_maps_mut(&dataset.tag);
for r in dataset.drain() {
if let Some(arr) = insert_and_query(&mut r_map, &mut l_map, &r, need_insert) {
for l in arr {
session.give((l.clone(), r.clone()))?;
}
session.give_iterator(arr.clone().into_iter().map(move|l|(l,r.clone())))?;
}
}
if dataset.is_last() {
Expand All @@ -216,8 +206,8 @@ where
fn internal_outer_join<L: Data + HasKey, R: Data + HasKey<Target = L::Target>>(
this: Stream<L>, other: Stream<R>, join_type: JoinType,
) -> Result<Stream<(Option<L>, Option<R>)>, BuildJobError>
where
L::Target: Clone + Send,
where
L::Target: Clone + Send,
{
let (output_left, output_right) = match join_type {
JoinType::LeftOuter => (true, false),
Expand All @@ -234,11 +224,9 @@ where
let (mut l_map, mut r_map, _, need_insert) = helper.get_maps_mut(&dataset.tag);
for l in dataset.drain() {
if let Some(arr) =
insert_and_query(&mut l_map, &mut r_map, &l, output_left || need_insert)
insert_and_query(&mut l_map, &mut r_map, &l, output_left || need_insert)
{
for r in arr {
session.give((Some(l.clone()), Some(r.clone())))?;
}
session.give_iterator(arr.clone().into_iter().map(move|r|(Some(l.clone()),Some(r))))?;
}
}
if dataset.is_last() {
Expand All @@ -258,11 +246,9 @@ where
let (mut l_map, mut r_map, need_insert, _) = helper.get_maps_mut(&dataset.tag);
for r in dataset.drain() {
if let Some(arr) =
insert_and_query(&mut r_map, &mut l_map, &r, output_right || need_insert)
insert_and_query(&mut r_map, &mut l_map, &r, output_right || need_insert)
{
for l in arr {
session.give((Some(l.clone()), Some(r.clone())))?;
}
session.give_iterator(arr.clone().into_iter().map(move|l|(Some(l),Some(r.clone()))))?;
}
}
if dataset.is_last() {
Expand All @@ -285,8 +271,8 @@ where
fn internal_semi_join<L: Data + HasKey, R: Data + HasKey<Target = L::Target>>(
this: Stream<L>, other: Stream<R>, join_type: JoinType,
) -> Result<Stream<L>, BuildJobError>
where
L::Target: Clone + Send,
where
L::Target: Clone + Send,
{
let is_anti = match join_type {
JoinType::Semi => false,
Expand Down
45 changes: 33 additions & 12 deletions research/engine/pegasus/pegasus/tests/join_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ fn join_test_key_by() {
.sink_into(output)
}
})
.expect("run job failure;");
.expect("run job failure;");

let mut result = result.next().unwrap().unwrap();
result.sort_by_key(|x| x.0 .0);
Expand All @@ -90,7 +90,7 @@ fn join_test_keyed() {
.sink_into(output)
}
})
.expect("run job failure;");
.expect("run job failure;");

let mut result = result.next().unwrap().unwrap();
result.sort_by_key(|x| x.0.item);
Expand All @@ -105,6 +105,27 @@ fn join_test_keyed() {
);
}

#[test]
fn join_test_with_same_key() {
let mut conf = JobConf::new("inner_join");
conf.set_workers(1);
let mut result = pegasus::run(conf, || {
let id = pegasus::get_current_worker().index;
move |input, output| {
let (src1, src2) = input.input_from(0..1000 as i32)?.copied()?;
src1.key_by(|x| Ok((1 as i32, x)))?
.partition_by_key()
.inner_join(src2.key_by(|x| Ok((1 as i32, x)))?.partition_by_key())?
.map(|(d1, d2)| Ok(((d1.key, d1.value), (d2.key, d2.value))))?
.collect::<Vec<((i32, i32), (i32, i32))>>()?
.sink_into(output)
}
})
.expect("run job failure;");

let mut result = result.next().unwrap().unwrap();
assert_eq!(result.len(), 1000*1000);
}
/*
#[test]
fn join_test_key_by_with_keyed() {
Expand Down Expand Up @@ -158,7 +179,7 @@ fn join_test_empty_stream() {
.sink_into(output)
}
})
.expect("run job failure;");
.expect("run job failure;");

let mut result = result.next().unwrap().unwrap();
result.sort_by_key(|x| x.0 .0);
Expand All @@ -181,7 +202,7 @@ fn join_test_outer() {
.sink_into(output)
}
})
.expect("run job failure;");
.expect("run job failure;");

let mut result = result.next().unwrap().unwrap();
result.sort();
Expand Down Expand Up @@ -216,7 +237,7 @@ fn join_test_semi() {
.sink_into(output)
}
})
.expect("run job failure;");
.expect("run job failure;");

let mut result = result.next().unwrap().unwrap();
result.sort();
Expand All @@ -239,7 +260,7 @@ fn join_test_anti() {
.sink_into(output)
}
})
.expect("run job failure;");
.expect("run job failure;");

let mut result = result.next().unwrap().unwrap();
result.sort();
Expand All @@ -261,11 +282,11 @@ fn join_test_different_tag_outer() {
let src2 = src2.key_by(|x| Ok((x, x)))?.partition_by_key();
src1.full_outer_join(src2)?.count()
})?
.collect::<Vec<(u64, u64)>>()?
.sink_into(output)
.collect::<Vec<(u64, u64)>>()?
.sink_into(output)
}
})
.expect("run job failure;");
.expect("run job failure;");

// println!("{:?}", result.next().unwrap());
let mut result = result.next().unwrap().unwrap();
Expand All @@ -288,11 +309,11 @@ fn join_test_different_tag_semi() {
let src2 = src2.key_by(|x| Ok((x, x)))?.partition_by_key();
src1.semi_join(src2)?.count()
})?
.collect::<Vec<(u64, u64)>>()?
.sink_into(output)
.collect::<Vec<(u64, u64)>>()?
.sink_into(output)
}
})
.expect("run job failure;");
.expect("run job failure;");

// println!("{:?}", result.next().unwrap());
let mut result = result.next().unwrap().unwrap();
Expand Down

0 comments on commit 0d68477

Please sign in to comment.